From 400d9dc2b3b292de7175631c8fd1fb8f1f4fc7fb Mon Sep 17 00:00:00 2001
From: Greg Quinn <greg.quinn@ssec.wisc.edu>
Date: Wed, 28 Oct 2009 21:03:49 +0000
Subject: [PATCH] Checking in the "grouper" line of development

---
 agg_new.py           | 226 +++++++++++++++++++++++++++++++++++++++++++
 aggregate.py         |   6 +-
 grouper.py           | 122 +++++++++++++++++++++++
 intercal.condor      |   2 +-
 intercal_db.py       | 150 ++++++++++++++++++++++------
 make_groupers.sql    |   5 +
 make_jobs.py         |  35 +++++++
 make_jobs.sql        |  14 ++-
 modis_l1b.py         |   2 +-
 ready_watch.py       |   7 +-
 ready_watch_apsps.py | 148 ++++++++++++++++++++++++++++
 setup.sql            |  84 ++++++++++++----
 wrapper_funnel.py    |  27 ++++--
 13 files changed, 763 insertions(+), 65 deletions(-)
 create mode 100644 agg_new.py
 create mode 100644 grouper.py
 create mode 100644 make_groupers.sql
 create mode 100644 make_jobs.py
 create mode 100644 ready_watch_apsps.py

diff --git a/agg_new.py b/agg_new.py
new file mode 100644
index 0000000..610d819
--- /dev/null
+++ b/agg_new.py
@@ -0,0 +1,226 @@
+import numpy
+import os
+import sys
+
+from pyhdf.SD import SD, SDC
+from util import HdfWriter
+
+grouper_definitions = {
+    'AIRS_BT_Hist': (Histogram, 200.0, 350.0, 150),
+    'MODIS_BT_Hist': (Histogram, 200.0, 350.0, 150),
+    'BT_Diff_Hist': (Histogram, -10.0, 10.0, 100),
+    'AIRS_BT_By_Location': (Stats,
+                            (-90.0, 90.0),
+                            (-180.0, 180.0),
+                            (180, 360)),
+    'BT_Diff_By_Location': (Stats,
+                            (-90.0, 90.0),
+                            (-180.0, 180.0),
+                            (180, 360)),
+    'BT_Diff_By_AIRS_BT': (Stats, 200.0, 350.0, 150),
+    'BT_Diff_By_Scan_Angle': (Stats, -55.0, 55.0, 110),
+    'BT_Diff_By_Solar_Zenith': (Stats, 0.0, 180.0, 180)
+}
+
+# helper functions to allow the classes below to be used with either
+# one or two dimensions
+
+def histogram_init_helper(num_bins, dtype):
+
+    if type(num_bins) == tuple:
+        return numpy.zeros(num_bins, dtype)
+    else:
+        return numpy.zeros((num_bins,), dtype)
+
+def histogram_helper(arr, **kwargs):
+
+    if type(arr) == tuple:
+        return numpy.histogram2d(arr[0], arr[1], **kwargs)[0]
+    else:
+        return numpy.histogram(arr, **kwargs)[0]
+
+# class for building up a histogram in parts
+class Histogram:
+
+    def __init__(self, min_value, max_value, num_bins):
+
+        self.range = (min_value, max_value)
+        self.num_bins = num_bins
+        self.hist = histogram_init_helper(num_bins, numpy.int32)
+
+    def add(self, arr):
+
+        self.hist += histogram_helper(arr,
+                                      bins=self.num_bins,
+                                      range=self.range)
+        
+    def get(self):
+
+        return self.hist
+
+# class for taking statistics on some value (e.g., brightness
+# temperature differences between AIRS and MODIS) in terms of some
+# basis (e.g. Earth location or instrument scan angle). like
+# Histogram, designed for building up these stats via successive
+# calls to add()
+class Stats:
+
+    def __init__(self, min_basis, max_basis, num_bins):
+
+        self.num_bins = num_bins
+        self.range = (min_basis, max_basis)
+        self.hist = Histogram(min_basis, max_basis, num_bins)
+        self.sum =  histogram_init_helper(num_bins, numpy.float64)
+        self.squared_sum = histogram_init_helper(num_bins, numpy.float64)
+
+    def add(self, basis, values):
+
+        self.hist.add(basis)
+
+        self.sum += histogram_helper(basis,
+                                     bins=self.num_bins,
+                                     range=self.range,
+                                     weights=values)
+
+        squared_values = values * values
+        self.squared_sum += histogram_helper(basis,
+                                             bins=self.num_bins,
+                                             range=self.range,
+                                             weights=squared_values)
+
+    def get(self):
+
+        n = self.hist.get()
+        mean = self.sum / n
+        std = numpy.sqrt((self.squared_sum / n) - (mean * mean))
+        return n, mean, std
+
+# helper class to save repetition for all the per-band data
+# structures needed below
+class PerBandHelper:
+
+    def __init__(self, ctor, *args):
+
+        self.data = [ctor(*args) for band in range(16)]
+
+    def __getitem__(self, key):
+
+        return self.data[key]
+
+    def get(self):
+
+        return self.data
+
+# verify command line inputs
+if len(sys.argv) != 3:
+    print 'usage: python %s <input_dir> <output_file>' % sys.argv[0]
+    sys.exit(1)
+input_dir = sys.argv[1]
+output_file = sys.argv[2]
+
+# set up per-band histogram and stats objects
+airs_bt_hist = PerBandHelper(Histogram, 200.0, 350.0, 150)
+modis_bt_hist = PerBandHelper(Histogram, 200.0, 350.0, 150)
+bt_diff_hist = PerBandHelper(Histogram, -10.0, 10.0, 100)
+latlon_airs_stats = PerBandHelper(Stats,
+                                  (-90.0, 90.0),
+                                  (-180.0, 180.0),
+                                  (180, 360))
+latlon_diff_stats = PerBandHelper(Stats,
+                                  (-90.0, 90.0),
+                                  (-180.0, 180.0),
+                                  (180, 360))
+bt_diff_stats = PerBandHelper(Stats, 200.0, 350.0, 150)
+scan_ang_diff_stats = PerBandHelper(Stats, -55.0, 55.0, 110)
+sol_zen_diff_stats = PerBandHelper(Stats, 0.0, 180.0, 180)
+
+# loop over each granule in the input directory
+for gran in range(1, 241):
+
+    print 'Processing granule %d' % gran
+
+    # make sure all the needed files are present
+    airs_file = '%s/intercal.a2m.2008.12.27.%03d.hdf' % (input_dir, gran)
+    modis_file = '%s/intercal.m2a.2008.12.27.%03d.hdf' % (input_dir, gran)
+    meta_file = '%s/intercal.meta.2008.12.27.%03d.hdf' % (input_dir, gran)
+    if not (os.path.exists(airs_file) and
+            os.path.exists(modis_file) and
+            os.path.exists(meta_file)):
+        print '  Granule %d missing' % gran
+        continue
+
+    # grab AIRS data
+    airs_sd = SD(airs_file)
+    airs_bt = airs_sd.select('AIRS_Brightness_Temp').get()
+    airs_flags = airs_sd.select('AIRS_Flags').get()
+
+    # MODIS data
+    modis_sd = SD(modis_file)
+    modis_bt = modis_sd.select('MODIS_Brightness_Temp').get()
+    modis_flags = modis_sd.select('MODIS_Flags').get()
+
+    # "meta" data
+    meta_sd = SD(meta_file)
+    lats = meta_sd.select('Latitude').get()
+    lons = meta_sd.select('Longitude').get()
+    scan_ang = meta_sd.select('Scan_Angle').get()
+    sol_zen = meta_sd.select('Solar_Zenith').get()
+
+    # AIRS - MODIS BTs
+    bt_diff = airs_bt - modis_bt
+
+    # we'll mask off all pixels where the data is suspect for either
+    # instrument
+    # FIXME: should nan BTs be discarded or zeroed?
+    mask = numpy.logical_and(airs_flags == 0, modis_flags == 0)
+    mask = numpy.logical_and(mask, numpy.isnan(bt_diff) == False)
+
+    # now update our per-band statistics
+    for band in range(16):
+
+        # set up some shorthands for convenience and to avoid
+        # duplicate mask applications
+        m = mask[band]
+        abt = airs_bt[band][m]
+        mbt = modis_bt[band][m]
+        btd = bt_diff[band][m]
+        la = lats[m]
+        lo = lons[m]
+        sa = scan_ang[m]
+        sz = sol_zen[m]
+
+        # do the actual updates
+        airs_bt_hist[band].add(abt)
+        modis_bt_hist[band].add(mbt)
+        bt_diff_hist[band].add(btd)
+        latlon_airs_stats[band].add((la, lo), abt)
+        latlon_diff_stats[band].add((la, lo), btd)
+        bt_diff_stats[band].add(abt, btd)
+        scan_ang_diff_stats[band].add(sa, btd)
+        sol_zen_diff_stats[band].add(sz, btd)
+
+# output results to HDF
+# FIXME: handle the limits more coherently
+writer = HdfWriter(output_file)
+writer.write_seq('AIRS_BT_Hist',
+                 [o.get() for o in airs_bt_hist.get()],
+                 min_bucket=200.0, max_bucket=350.0)
+writer.write_seq('MODIS_BT_Hist',
+                 [o.get() for o in modis_bt_hist.get()],
+                 min_bucket=200.0, max_bucket=350.0)
+writer.write_seq('BT_Diff_Hist',
+                 [o.get() for o in bt_diff_hist.get()],
+                 min_bucket=-10.0, max_bucket=10.0)
+writer.write_seq('AIRS_BT_By_Location',
+                 [o.get()[1] for o in latlon_airs_stats.get()])
+writer.write_seq('BT_Diff_By_Location',
+                 [o.get()[1] for o in latlon_diff_stats.get()])
+writer.write_seq('BT_Diff_By_AIRS_BT',
+                 [o.get()[1] for o in bt_diff_stats.get()],
+                 min_bucket=200.0, max_bucket=350.0)
+writer.write_seq('BT_Diff_By_Scan_Angle',
+                 [o.get()[1] for o in scan_ang_diff_stats.get()],
+                 min_bucket=-55.0, max_bucket=55.0)
+writer.write_seq('BT_Diff_By_Solar_Zenith',
+                 [o.get()[1] for o in sol_zen_diff_stats.get()],
+                 min_bucket=0.0, max_bucket=180.0)
diff --git a/aggregate.py b/aggregate.py
index f00546d..00f2f41 100644
--- a/aggregate.py
+++ b/aggregate.py
@@ -123,9 +123,9 @@ for gran in range(1, 241):
     print 'Processing granule %d' % gran
 
     # make sure all the needed files are present
-    airs_file = '%s/a2m_%03d.hdf' % (input_dir, gran)
-    modis_file = '%s/m2a_%03d.hdf' % (input_dir, gran)
-    meta_file = '%s/meta_%03d.hdf' % (input_dir, gran)
+    airs_file = '%s/intercal.a2m.2008.12.31.%03d.hdf' % (input_dir, gran)
+    modis_file = '%s/intercal.m2a.2008.12.31.%03d.hdf' % (input_dir, gran)
+    meta_file = '%s/intercal.meta.2008.12.31.%03d.hdf' % (input_dir, gran)
     if not (os.path.exists(airs_file) and
             os.path.exists(modis_file) and
             os.path.exists(meta_file)):
diff --git a/grouper.py b/grouper.py
new file mode 100644
index 0000000..957faa4
--- /dev/null
+++ b/grouper.py
@@ -0,0 +1,122 @@
+import glob
+import sys
+
+# helper functions to allow the classes below to be used with either
+# one or two dimensions
+
+def histogram_init_helper(num_bins, dtype):
+
+    if type(num_bins) == tuple:
+        return numpy.zeros(num_bins, dtype)
+    else:
+        return numpy.zeros((num_bins,), dtype)
+
+def histogram_helper(arr, **kwargs):
+
+    if type(arr) == tuple:
+        return numpy.histogram2d(arr[0], arr[1], **kwargs)[0]
+    else:
+        return numpy.histogram(arr, **kwargs)[0]
+
+# class for building up a histogram in parts
+class Histogram:
+
+    def __init__(self, min_value, max_value, num_bins):
+
+        self.range = (min_value, max_value)
+        self.num_bins = num_bins
+        self.hist = histogram_init_helper(num_bins, numpy.int32)
+
+    def add(self, arr):
+
+        self.hist += histogram_helper(arr,
+                                      bins=self.num_bins,
+                                      range=self.range)
+
+    def get(self):
+
+        return self.hist
+
+# class for taking statistics on some value (e.g., brightness
+# temperature differences between AIRS and MODIS) in terms of some
+# basis (e.g. Earth location or instrument scan angle). like
+# Histogram, designed for building up these stats via successive
+# calls to add()
+class Stats:
+
+    def __init__(self, min_basis, max_basis, num_bins):
+
+        self.num_bins = num_bins
+        self.range = (min_basis, max_basis)
+        self.hist = Histogram(min_basis, max_basis, num_bins)
+        self.sum =  histogram_init_helper(num_bins, numpy.float64)
+        self.squared_sum = histogram_init_helper(num_bins, numpy.float64)
+
+    def add(self, basis, values):
+
+        self.hist.add(basis)
+
+        self.sum += histogram_helper(basis,
+                                     bins=self.num_bins,
+                                     range=self.range,
+                                     weights=values)
+
+        squared_values = values * values
+        self.squared_sum += histogram_helper(basis,
+                                             bins=self.num_bins,
+                                             range=self.range,
+                                             weights=squared_values)
+
+    def get(self):
+
+        n = self.hist.get()
+        mean = self.sum / n
+        std = numpy.sqrt((self.squared_sum / n) - (mean * mean))
+        return n, mean, std
+
+# helper class to save repetition for all the per-band data
+# structures needed below
+class PerBandHelper:
+
+    def __init__(self, ctor, *args):
+
+        self.data = [ctor(*args) for band in range(16)]
+
+    def __getitem__(self, key):
+
+        return self.data[key]
+
+    def get(self):
+
+        return self.data
+
+grouper_definitions = {
+    'AIRS_BT_Hist': (Histogram, 200.0, 350.0, 150),
+    'MODIS_BT_Hist': (Histogram, 200.0, 350.0, 150),
+    'BT_Diff_Hist': (Histogram, -10.0, 10.0, 100),
+    'AIRS_BT_By_Location': (Stats,
+                            (-90.0, 90.0),
+                            (-180.0, 180.0),
+                            (180, 360)),
+    'BT_Diff_By_Location': (Stats,
+                            (-90.0, 90.0),
+                            (-180.0, 180.0),
+                            (180, 360)),
+    'BT_Diff_By_AIRS_BT': (Stats, 200.0, 350.0, 150),
+    'BT_Diff_By_Scan_Angle': (Stats, -55.0, 55.0, 110),
+    'BT_Diff_By_Solar_Zenith': (Stats, 0.0, 180.0, 180)
+}
+
+# check command line
+if len(sys.argv) < 5:
+    print 'usage: python %s <year> <month> <day> <grouper> ...' % sys.argv[0]
+    sys.exit(1)
+year = int(sys.argv[1])
+month = int(sys.argv[2])
+day = int(sys.argv[3])
+grouper_names = sys.argv[4:]
+
+# loop over each granule in the input directory
+files = glob.glob('intercal.*.%04d.%02d.%02d.*.hdf' % (year, month, day))
+files.sort()
+for gran in range(1, 241):
diff --git a/intercal.condor b/intercal.condor
index eda30c4..e5b9b53 100644
--- a/intercal.condor
+++ b/intercal.condor
@@ -16,4 +16,4 @@ log = log.$(Cluster).$(Process)
 
 notification = never
 
-queue 200
+queue
diff --git a/intercal_db.py b/intercal_db.py
index 9c49733..855e1e6 100644
--- a/intercal_db.py
+++ b/intercal_db.py
@@ -4,13 +4,24 @@
 import pgdb
 import time
 
+# helper for connecting to the database
+def connect():
+
+    return pgdb.connect(database='intercal')
+
+# helper for getting the current UTC time in a PostgreSQL-friendly
+# format
+def get_time():
+
+    return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
+
 # claim a job for forward processing. claiming a job just sets its
 # status as 'running' in the DB, but it is done in such a way as to
 # ensure that only one process can claim a given job. returns an
 # (airs_date, airs_chunk) pair. the processing system name ('funnel'
 # or 'peate' for starters) and an identifier within that system are
 # input parameters
-def get_forward_job(system, execution_id):
+def get_forward_job(system, system_id):
 
     # connect to DB
     con = connect()
@@ -24,23 +35,23 @@ def get_forward_job(system, execution_id):
 
         # find the most recent 'ready' job
         cur.execute("select airs_date, airs_chunk " +
-                        "from jobs " +
+                        "from intercal_jobs " +
                         "where status = 'ready' " +
                         "order by airs_date desc, airs_chunk desc " +
                         "limit 1")
         airs_date, airs_chunk = cur.fetchone()
 
         # try and claim it
-        if get_job(airs_date, airs_chunk, system, execution_id, con):
+        if get_intercal_job(airs_date, airs_chunk, system, system_id, con):
             return airs_date, airs_chunk
 
     # again, this really shouldn't happen
     raise Exception('could not get a job to execute')
 
 # claim the job with the given data and chunk number. system and
-# execution_id are the same as for get_forward_job(). con can be
+# system_id are the same as for get_forward_job(). con can be
 # set if a DB connection is already established
-def get_job(airs_date, airs_chunk, system, execution_id, con=None):
+def get_intercal_job(airs_date, airs_chunk, system, system_id, con=None):
 
     # connect to the DB if not already
     if con == None:
@@ -54,7 +65,7 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None):
     # no longer satisfied). if the other transaction is rolled back,
     # then our transaction will update one row
     cur = con.cursor()
-    cur.execute("update jobs set status = 'running' " +
+    cur.execute("update intercal_jobs set status = 'running' " +
                     "where airs_date = '%s' and " % airs_date +
                           "airs_chunk = %s and " % airs_chunk +
                           "status = 'ready'")
@@ -64,11 +75,11 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None):
     # we got the claim. now add a row to the executions table to
     # document our process
     start_time = get_time()
-    cur.execute("insert into executions (airs_date, airs_chunk, " +
-                                         "system, id, " +
-                                         "start_time) " +
+    cur.execute("insert into intercal_executions (airs_date, airs_chunk, " +
+                                                 "system, id, " +
+                                                 "start_time) " +
                     "values ('%s', %s, " % (airs_date, airs_chunk) +
-                            "'%s', '%s'," % (system, execution_id) +
+                            "'%s', '%s'," % (system, system_id) +
                             "'%s')" % start_time)
 
     # don't forgot to commit!
@@ -76,29 +87,18 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None):
     return True
 
 # for marking a job as successfuly completed
-def job_done(airs_date, airs_chunk):
+def intercal_done(airs_date, airs_chunk):
 
-    job_done_or_failed(airs_date, airs_chunk, 'done')
+    intercal_done_or_failed(airs_date, airs_chunk, 'done')
 
 # for marking a job as failed
-def job_failed(airs_date, airs_chunk):
+def intercal_failed(airs_date, airs_chunk):
 
-    job_done_or_failed(airs_date, airs_chunk, 'failed')
-
-# helper for connecting to the database
-def connect():
-
-    return pgdb.connect(database='intercal')
-
-# helper for getting the current UTC time in a PostgreSQL-friendly
-# format
-def get_time():
-
-    return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
+    intercal_done_or_failed(airs_date, airs_chunk, 'failed')
 
 # helper for updating the database once execution is completed,
 # either with success or failure
-def job_done_or_failed(airs_date, airs_chunk, status):
+def intercal_done_or_failed(airs_date, airs_chunk, status):
 
     # connect to the DB
     con = connect()
@@ -106,14 +106,108 @@ def job_done_or_failed(airs_date, airs_chunk, status):
 
     # record the end-time of this execution
     stop_time = get_time()
-    cur.execute("update executions set stop_time = '%s' " % stop_time +
+    cur.execute("update intercal_executions " +
+                    "set stop_time = '%s' " % stop_time +
                     "where airs_date = '%s' and " % airs_date +
                           "airs_chunk = '%s'" % airs_chunk)
 
     # update the jobs table so this job is no longer marked 'running'
-    cur.execute("update jobs set status = '%s' " % status +
+    cur.execute("update intercal_jobs " +
+                    "set status = '%s' " % status +
                     "where airs_date = '%s' and " % airs_date +
                           "airs_chunk = %s" % airs_chunk)
 
     # don't forget to commit!
     con.commit()
+
+def start_groupers(airs_date, airs_chunk, system, system_id):
+
+    # connect to DB
+    con = connect()
+    cur = con.cursor()
+
+    # set all grouper jobs for this chunk to running
+    cur.execute("update grouper_jobs set status = 'running' " +
+                    "where airs_date = '%s' and " % airs_date +
+                          "airs_chunk = %s and " % airs_chunk +
+                          "status = 'idle' " +
+                    "returning grouper_id")
+    grouper_ids = [row[0] for row in cur]
+
+    # create a grouper_execution row
+    start_time = get_time()
+    cur.execute("insert into grouper_executions (type, system, id, " +
+                                                "start_time) " +
+                    "values ('forward', " +
+                            "'%s', '%s', " % (system,system_id) +
+                            "'%s') " % start_time +
+                    "returning execution_id")
+    execution_id = cur.fetchone()[0]
+
+    # create grouper execution mappings
+    # FIXME: it's tricky to follow what's going on with the
+    # format specifiers here
+    cur.executemany("insert into grouper_execution_mappings (execution_id, " +
+                                                            "airs_date, " +
+                                                            "airs_chunk, " +
+                                                            "grouper_id) " +
+                        "values (%s, " % execution_id +
+                                "'%s', %s, " % (airs_date, airs_chunk) +
+                                "%s)",
+                    ((grouper_id,) for grouper_id in grouper_ids))
+
+    # updates are done now, so commit
+    con.commit()
+
+    # get the names of the groupers so we can return them
+    # FIXME: better way to do this?
+    grouper_names = []
+    for grouper_id in grouper_ids:
+        cur.execute("select name from groupers " +
+                        "where grouper_id = %s" % grouper_id)
+        grouper_names.append(cur.fetchone()[0])
+
+    return execution_id, grouper_ids, grouper_names
+
+def groupers_done(airs_date, airs_chunk, execution_id, grouper_ids):
+
+    groupers_done_or_failed(airs_date, airs_chunk,
+                            execution_id,
+                            grouper_ids,
+                            'done')
+
+def groupers_failed(airs_date, airs_chunk, execution_id, grouper_ids):
+
+    groupers_done_or_failed(airs_date, airs_chunk,
+                            execution_id,
+                            grouper_ids,
+                            'failed')
+
+def groupers_done_or_failed(airs_date, airs_chunk,
+                            execution_id,
+                            grouper_ids,
+                            status):
+
+    # connect to the DB
+    con = connect()
+    cur = con.cursor()
+
+    # record the end-time of this execution
+    stop_time = get_time()
+    cur.execute("update grouper_executions " +
+                    "set stop_time = '%s' " % stop_time +
+                    "where execution_id = %s" % execution_id)
+
+    # update the grouper_jobs table so these grouper jobs are no
+    # longer marked 'running'
+    # FIXME: it's tricky to follow what's going on with the
+    # format specifiers here
+    cur.executemany("update grouper_jobs " +
+                        "set status = '%s' " % status +
+                        "where airs_date = '%s' and " % airs_date +
+                              "airs_chunk = %s and " % airs_chunk +
+                              "grouper_id = %s",
+                    ((grouper_id,) for grouper_id in grouper_ids))
+
+    # don't forget to commit!
+    con.commit()
diff --git a/make_groupers.sql b/make_groupers.sql
new file mode 100644
index 0000000..852d639
--- /dev/null
+++ b/make_groupers.sql
@@ -0,0 +1,5 @@
+insert into groupers (name) values ('Test1');
+insert into groupers (name) values ('Test2');
+insert into groupers (name) values ('Test3');
+insert into groupers (name) values ('Test4');
+insert into groupers (name) values ('Test5');
diff --git a/make_jobs.py b/make_jobs.py
new file mode 100644
index 0000000..b3917a3
--- /dev/null
+++ b/make_jobs.py
@@ -0,0 +1,35 @@
+import pgdb
+
+con = pgdb.connect(database='intercal')
+cur = con.cursor()
+
+query = """
+insert into intercal_jobs
+    select '2003-01-01'::date + s1.i, s2.i, 'held'
+        from generate_series(0, current_date - '2003-01-01') as s1(i),
+             generate_series(0, 23) as s2(i)
+        where s1.i > coalesce((select max(airs_date) - '2003-01-01'
+                                   from intercal_jobs),
+                              -1)
+"""
+cur.execute(query)
+print 'Inserted %s rows into intercal_jobs' % cur.rowcount
+
+query = """
+insert into grouper_jobs
+    select '2003-01-01'::date + s1.i, s2.i, %s, 'idle'
+        from generate_series(0, current_date - '2003-01-01') as s1(i),
+             generate_series(0, 23) as s2(i)
+        where s1.i > coalesce((select max(airs_date) - '2003-01-01'
+                                   from grouper_jobs
+                                   where grouper_id = %s),
+                              -1)
+"""
+cur.execute("select grouper_id, name from groupers")
+for grouper_id, grouper_name in cur:
+    cur2 = con.cursor()
+    cur2.execute(query % (grouper_id, grouper_id))
+    print 'Inserted %s rows into grouper_jobs for grouper %s' % (cur2.rowcount,
+                                                                 grouper_name)
+
+con.commit()
diff --git a/make_jobs.sql b/make_jobs.sql
index 51b690a..9b370f5 100644
--- a/make_jobs.sql
+++ b/make_jobs.sql
@@ -3,9 +3,19 @@
 -- granules up through the current day. this should be run daily to
 -- keep the jobs table current
 
-insert into jobs
+insert into intercal_jobs
     select '2003-01-01'::date + s1.i, s2.i, 'held'
         from generate_series(0, current_date - '2003-01-01') as s1(i),
              generate_series(0, 23) as s2(i)
-        where s1.i > coalesce((select max(airs_date) - '2003-01-01' from jobs),
+        where s1.i > coalesce((select max(airs_date) - '2003-01-01'
+                                   from intercal_jobs),
+                              -1);
+
+insert into grouper_jobs
+    select '2003-01-01'::date + s1.i, s2.i, 1, 'idle'
+        from generate_series(0, current_date - '2003-01-01') as s1(i),
+             generate_series(0, 23) as s2(i)
+        where s1.i > coalesce((select max(airs_date) - '2003-01-01'
+                                   from grouper_jobs
+                                   where grouper_id = 1),
                               -1);
diff --git a/modis_l1b.py b/modis_l1b.py
index 949504a..017e2c4 100644
--- a/modis_l1b.py
+++ b/modis_l1b.py
@@ -66,7 +66,7 @@ def get_ancillary_file(file_type, t):
 
     # get the type id for Steve's database and the filename prefix
     # based on the file type
-    d = {'A': (8, 'PM1ATTRN'), 'E': (9, 'PM1EPHND')}
+    d = {'A': (8, 'PM1ATTNR'), 'E': (9, 'PM1EPHND')}
     db_type, prefix = d[file_type]
 
     # see if we already have this ancillary file
diff --git a/ready_watch.py b/ready_watch.py
index 694d569..4483264 100644
--- a/ready_watch.py
+++ b/ready_watch.py
@@ -24,7 +24,7 @@ i_con = pgdb.connect(database='intercal')
 i_cur = i_con.cursor()
 
 # determine the current range of AIRS granules to consider
-i_cur.execute("select min(airs_date), max(airs_date) from jobs")
+i_cur.execute("select min(airs_date), max(airs_date) from intercal_jobs")
 min_date, max_date = (parse_time(s) for s in i_cur.fetchone())
 num_days = ((max_date - min_date) / 86400) + 1
 
@@ -86,7 +86,8 @@ eph_mask[offsets] = True
 # below is dependent on the fact that we are processing the AIRS
 # granules an hour at a time
 ready_list = []
-i_cur.execute("select airs_date, airs_chunk from jobs where status = 'held'")
+i_cur.execute("select airs_date, airs_chunk from intercal_jobs " +
+                  "where status = 'held'")
 for airs_date, airs_chunk in i_cur:
 
     # determine the nominal time for this AIRS granule
@@ -133,7 +134,7 @@ for airs_date, airs_chunk in i_cur:
 
 # update the InterCal processing DB with the jobs we now consider
 # ready
-i_cur.executemany("update jobs set status = 'ready' " +
+i_cur.executemany("update intercal_jobs set status = 'ready' " +
                       "where airs_date = %s and airs_chunk = %s",
                   ready_list)
 i_con.commit()
diff --git a/ready_watch_apsps.py b/ready_watch_apsps.py
new file mode 100644
index 0000000..cdf09c3
--- /dev/null
+++ b/ready_watch_apsps.py
@@ -0,0 +1,148 @@
+
+# script to check Steve's database for InterCal "jobs" (groups of
+# 10 AIRS granules) that are ready for processing. besides the AIRS
+# L1B files themselves, we also need MODIS L1A files and the attitude
+# and ephemeris files needed to process them into L1B. this needs to
+# be run frequently to keep the database current
+
+import calendar
+import numpy
+import pgdb
+import subprocess
+import time
+
+# helper for parsing PostgreSQL dates
+def parse_time(time_str):
+    return calendar.timegm(time.strptime(time_str, '%Y-%m-%d'))
+
+# helper for getting the nominal granule time from an AIRS filename
+def parse_airs_filename(filename):
+    return (calendar.timegm(time.strptime(filename[5:15], '%Y.%m.%d')) +
+            360 * (int(filename[16:19]) - 1))
+
+# helper for going from "AIRS offset" to a date and chunk number
+def print_time_and_chunk(t):
+    print '%s %s' % (time.strftime('%Y-%m-%d', time.gmtime(t)),
+                     t % 86400 / (60 * 60))
+
+# hard-code range of AIRS dates to consider
+min_date = parse_time('2007-07-01')
+max_date = parse_time('2008-06-30')
+num_days = ((max_date - min_date) / 86400) + 1
+
+# get available AIRS times via 'dmsls'
+airs_mask = numpy.zeros((num_days * 240.), numpy.bool)
+cmd = ('dmsls', 'AIRS.*')
+p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+for line in p.stdout:
+    if line[:4] != 'AIRS':
+        continue
+    t = parse_airs_filename(line)
+    if (t < min_date) or (t >= (max_date + 86400)):
+        continue
+    airs_mask[(t - min_date) / (6 * 60)] = True
+p.wait()
+
+# connect to Steve's database
+p_con = pgdb.connect(host='twister', database='PEATE_v3', password='abc')
+p_cur = p_con.cursor()
+
+# get times of all other needed files from the DB, sorted
+min_date_str = time.strftime('%Y.%m.%d', time.gmtime(min_date))
+max_date_str = time.strftime('%Y.%m.%d', time.gmtime(max_date))
+p_cur.execute("select fn " +
+                  "from files " +
+                  "where type = 41 and " +
+                        "date >= '%s' and " % min_date_str +
+                        "date <= '%s' " % max_date_str +
+                  "order by fn")
+airs_times = numpy.array([parse_airs_filename(row[0]) for row in p_cur]) 
+p_cur.execute("select unix_t " +
+                  "from files " +
+                  "where type < 4 and " +
+                        "date >= '%s' and " % min_date_str +
+                        "date <= '%s' " % max_date_str +
+                  "order by unix_t")
+mod_times = numpy.array(p_cur.fetchall()).flatten()
+cut_date_min = min_date
+cut_date_max = max_date + (24 * 60 * 60)
+p_cur.execute("select unix_t " +
+                  "from anc_files " +
+                  "where type = 8 and " +
+                        "unix_t >= %s and " % cut_date_min +
+                        "unix_t < %s " % cut_date_max +
+                  "order by unix_t")
+att_times = numpy.array(p_cur.fetchall()).flatten()
+cut_date_min -= (24 * 60 * 60)
+p_cur.execute("select unix_t " +
+                  "from anc_files " +
+                  "where type = 9 and " +
+                        "unix_t >= %s and " % cut_date_min +
+                        "unix_t < %s " % cut_date_max +
+                  "order by unix_t")
+eph_times = numpy.array(p_cur.fetchall()).flatten()
+
+# create masks of which MODIS, attitude, and ephemeris files are
+# available
+
+offsets = (mod_times - min_date) / (5 * 60)
+mod_mask = numpy.zeros((num_days * 288,), numpy.bool)
+mod_mask[offsets] = True
+
+offsets = (att_times - min_date) / (2 * 60 * 60)
+att_mask = numpy.zeros((num_days * 12,), numpy.bool)
+att_mask[offsets] = True
+
+offsets = (eph_times - min_date + (12 * 60 * 60)) / (24 * 60 * 60)
+eph_mask = numpy.zeros((num_days + 1,), numpy.bool)
+eph_mask[offsets] = True
+
+# now loop through InterCal jobs labeled 'held' and for each check
+# if all its needed dependencies are available. note that the code
+# below is dependent on the fact that we are processing the AIRS
+# granules an hour at a time
+for airs_time in range(min_date, max_date + (24 * 60 * 60), (60 * 60)):
+
+    # determine the nominal time for this AIRS granule
+    airs_date = t / (24 * 60 * 60) * (24 * 60 * 60)
+    airs_chunk = t % (24 * 60 * 60) / (60 * 60)
+
+    # check for the AIRS file itself
+    airs_offset = (airs_time - min_date) / (6 * 60)
+    if not airs_mask[airs_offset]:
+        print_time_and_chunk(airs_time)
+        continue
+
+    # we need the 14 MODIS L1A files starting with the same nominal
+    # time as the first AIRS file (note that AIRS nominal times are
+    # about 5.5 minutes late compared to MODIS)
+    num_mod_files = 14
+    mod_offset_ini = (airs_time - min_date) / (5 * 60)
+    mod_offset_fin = mod_offset_ini + num_mod_files
+    if not mod_mask[mod_offset_ini:mod_offset_fin].all():
+        print_time_and_chunk(airs_time)
+        continue
+
+    # unless the MODIS files cross a two-hour boundary, they all fit
+    # between 2 attitude files. otherwise, we'll need 3
+    if (airs_chunk % 2) == 1:
+        num_att_files = 3
+    else:
+        num_att_files = 2
+    att_offset_ini = (airs_time - min_date) / (2 * 60 * 60)
+    att_offset_fin = att_offset_ini + num_att_files
+    if not att_mask[att_offset_ini:att_offset_fin].all():
+        print_time_and_chunk(airs_time)
+        continue
+
+    # unless the MODIS files cross noon UTC, the all fit between 2
+    # ephemeris files. otherwise, we'll need 3
+    if airs_chunk == 11:
+        num_eph_files = 3
+    else:
+        num_eph_files = 2
+    eph_offset_ini = (airs_time - min_date + (12 * 60 * 60)) / (24 * 60 * 60)
+    eph_offset_fin = eph_offset_ini + num_eph_files
+    if not eph_mask[eph_offset_ini:eph_offset_fin].all():
+        print_time_and_chunk(airs_time)
+        continue
diff --git a/setup.sql b/setup.sql
index 6e2dcc5..c143fe1 100644
--- a/setup.sql
+++ b/setup.sql
@@ -1,35 +1,79 @@
 
 -- SQL to initialize the InterCal processing database
 
--- the jobs table contains one row for every hour of AIRS data. new
--- jobs are added via make_jobs.sql. when jobs are first added they
--- are placed in the 'held' status. when the ready_watch.py script
--- determines that all needed data is available, the job is moved to
--- 'ready'. it can then be claimed (set 'running') by a process in
--- the cluster or on funnel. after processing, it is set to 'failed'
--- or 'done' depending on the result. these later 3 stages are
--- managed via the intercal_db.py module
-create domain status text check (value = 'held' or
-                                 value = 'ready' or
-                                 value = 'running' or
-                                 value = 'failed' or
-                                 value = 'done');
-create table jobs (
+create domain system text check (value = 'funnel' or value = 'peate');
+create domain chunk integer check (value >= 0 and value <= 23);
+
+-- the intercal_jobs table contains one row for every hour of AIRS
+-- data. new jobs are added via make_jobs.sql. when jobs are first
+-- added they are placed in the 'held' status. when the
+-- ready_watch.py script determines that all needed data is
+-- available, the job is moved to 'ready'. it can then be claimed
+-- (set 'running') by a process in the cluster or on funnel. after
+-- processing, it is set to 'failed' or 'done' depending on the
+-- result. these later 3 stages are managed via the intercal_db.py
+-- module
+create domain intercal_status text check (value = 'held' or
+                                          value = 'ready' or
+                                          value = 'running' or
+                                          value = 'failed' or
+                                          value = 'done');
+create table intercal_jobs (
     airs_date  date,
-    airs_chunk integer check (airs_chunk >= 0 and airs_chunk <= 23),
-    status     status not null,
+    airs_chunk chunk,
+    status     intercal_status not null,
     primary key (airs_date, airs_chunk)
 );
 
 -- each time a process executes an InterCal job, it records an entry
--- in the executions table for posterity
-create domain system text check (value = 'funnel' or value = 'peate');
-create table executions (
+-- in the intercal_executions table
+create table intercal_executions (
     airs_date   date,
     airs_chunk  integer,
     system      system not null,
     id          text not null,
     start_time  timestamp not null,
     stop_time   timestamp,
-    foreign key (airs_date, airs_chunk) references jobs
+    foreign key (airs_date, airs_chunk) references intercal_jobs
+);
+
+create table groupers (
+    grouper_id serial primary key,
+    name       text unique not null
+);
+
+-- TODO: is 'merged' needed?
+create domain grouper_status text check (value = 'idle' or
+                                         value = 'running' or
+                                         value = 'done' or
+                                         value = 'failed');
+create table grouper_jobs (
+    airs_date  date,
+    airs_chunk chunk,
+    grouper_id integer references groupers,
+    status     grouper_status not null,
+    primary key (airs_date, airs_chunk, grouper_id)
+);
+
+create domain grouper_type text check (value = 'forward' or
+                                       value = 'catch-up');
+create table grouper_executions (
+    execution_id serial primary key,
+    type         grouper_type not null,
+    system       system not null,
+    id           text not null,
+    start_time   timestamp not null,
+    stop_time    timestamp
+);
+
+create table grouper_execution_mappings (
+    execution_id integer references grouper_executions,
+    airs_date    date,
+    airs_chunk   chunk,
+    grouper_id   integer,
+    foreign key (airs_date, airs_chunk, grouper_id) references grouper_jobs
+);
+
+create table grouper_catch_ups (
+    airs_date date primary key
 );
diff --git a/wrapper_funnel.py b/wrapper_funnel.py
index 97ee8af..e55fbab 100644
--- a/wrapper_funnel.py
+++ b/wrapper_funnel.py
@@ -23,14 +23,17 @@ def get_time():
 if len(sys.argv) != 2:
     print 'usage: %s <execution_id>' % sys.argv[0]
     sys.exit(1)
-execution_id = sys.argv[1]
+condor_id = sys.argv[1]
 
 # switch into the condor execute directory so our disk space will be
 # cleaned up when we're done
 os.chdir(os.environ['_CONDOR_SCRATCH_DIR'])
 
 # get a job to work on
-airs_date, airs_chunk = intercal_db.get_forward_job('funnel', execution_id)
+airs_date, airs_chunk = intercal_db.get_forward_job('funnel', condor_id)
+
+# DEBUG
+return_code = 0
 
 # run the driver
 year = airs_date[0:4]
@@ -45,11 +48,11 @@ cmd = ('python',
        year, month, day,
        str(start_gran_num), str(num_granules),
        output_prefix)
-return_code = subprocess.call(cmd)
+#return_code = subprocess.call(cmd)
 
 # if the driver exited with failure, bail
 if return_code != 0:
-    intercal_db.job_failed(airs_date, airs_chunk)
+    intercal_db.intercal_failed(airs_date, airs_chunk)
     raise Exception('driver failed with return code %s' % return_code)
 
 # upload the files into the DMS; if any of these uploads fail,
@@ -61,10 +64,20 @@ for i in range(10):
                                                 year, month, day,
                                                 start_gran_num + i)
         cmd = ('dmsput', filename)
-        return_code = subprocess.call(cmd)
+        #return_code = subprocess.call(cmd)
         if return_code != 0:
-            intercal_db.job_failed(airs_date, airs_chunk)
+            intercal_db.intercal_failed(airs_date, airs_chunk)
             raise Exception(' failed: %s' % return_code)
 
 # looks like everything worked!
-intercal_db.job_done(airs_date, airs_chunk)
+intercal_db.intercal_done(airs_date, airs_chunk)
+
+# now on to the groupers
+# TODO: do something real here
+(execution_id,
+ grouper_ids,
+ grouper_names) = intercal_db.start_groupers(airs_date, airs_chunk,
+                                             'funnel', condor_id)
+for grouper_name in grouper_names:
+    print 'GROUPER: %s' % grouper_name
+intercal_db.groupers_done(airs_date, airs_chunk, execution_id, grouper_ids)
-- 
GitLab