diff --git a/airs2modis.py b/airs2modis.py index 53c26a685e57434e7f4f1948350034eead449ea2..7e05ca1bf65d41159fb8426164de27dc8af55f38 100644 --- a/airs2modis.py +++ b/airs2modis.py @@ -1,3 +1,7 @@ + +# script for producing AIRS radiances in terms of MODIS spectral +# response functions + import calendar import flags import numpy diff --git a/airs2modis_pre.py b/airs2modis_pre.py index 39dcda841848e4d6f234aff1b36903ff57f82c23..beaeb541f998bfa8c6e52080ad0b67cbb7a93069 100644 --- a/airs2modis_pre.py +++ b/airs2modis_pre.py @@ -1,3 +1,4 @@ + # This script uses an AIRS channel properties files and a MODIS # spectral response file to produce an intermediate airs2modis.hdf # file. This intermediate file is then used as input by the diff --git a/airs_l1b.py b/airs_l1b.py index 32b74fa28370fcd9e2fbb7411b214d0b019673d1..294a74f8c6f2af85de0f880957bafde66c863ec1 100644 --- a/airs_l1b.py +++ b/airs_l1b.py @@ -1,3 +1,6 @@ + +# some utilities for dealing with AIRS granules + import PEATE_lib_v3 import glob import pgdb diff --git a/bright.py b/bright.py index 76a19f7b0ccfc4f7acf112a4ef8dfe9c8293224f..97b9fa76cc6ea2e2ca2ef266f7d6bb079caae72d 100644 --- a/bright.py +++ b/bright.py @@ -1,3 +1,4 @@ + # General brightness temperature routines import numpy diff --git a/driver.py b/driver.py index 83bc35a6ec5ea3db5fdd7c372f43437a32a6ef69..92b4b5077eb572cbe13b77bcff9592ad18e3886c 100644 --- a/driver.py +++ b/driver.py @@ -1,3 +1,7 @@ + +# script for conducting InterCal processing for a set of contiguous +# AIRS granules + import airs_l1b import calendar import modis_l1b @@ -8,6 +12,7 @@ import time from pyhdf.SD import SD +# function for processing a single AIRS granule def process(airs_date, airs_gran_num): # ensure we have the AIRS file diff --git a/flags.py b/flags.py index 0da9372ce52c572a7f8d9c592bf9f91eb63c490f..35e1f967b08f2f097a6787862bb34371a2c1d5ad 100644 --- a/flags.py +++ b/flags.py @@ -1,4 +1,5 @@ -# Module with values for flags output in the intercal system + +# Module with values for flags output in the InterCal system INVALID = 1 LOW = 2 diff --git a/get_meta.py b/get_meta.py index d8c5c1053d957798ea473a4ddd147a1c126d74b0..5243fe132157604f477b1a447ebc9147ca59c862 100644 --- a/get_meta.py +++ b/get_meta.py @@ -1,3 +1,7 @@ + +# pulls information out of an AIRS granule that might be useful in +# analyzing InterCal results + import sys from pyhdf.SD import SD, SDC diff --git a/intercal.condor b/intercal.condor index 9bf10afa1dd75cbc78c37cdb98e049dbce866760..15157ad6652b66dcc83891006fcbef12c817e175 100644 --- a/intercal.condor +++ b/intercal.condor @@ -1,3 +1,6 @@ + +# submit file for InterCal forward processing on funnel + universe = vanilla initial_dir = /data/gregq/condor/submit diff --git a/intercal_db.py b/intercal_db.py index 1194f3b4859eed65f12135300464096c724ef8b9..9c49733034c52b00b901ea5735eee3c9c4d72a8f 100644 --- a/intercal_db.py +++ b/intercal_db.py @@ -1,12 +1,28 @@ + +# utilities for interacting with the InterCal processing database + import pgdb import time +# claim a job for forward processing. claiming a job just sets its +# status as 'running' in the DB, but it is done in such a way as to +# ensure that only one process can claim a given job. returns an +# (airs_date, airs_chunk) pair. the processing system name ('funnel' +# or 'peate' for starters) and an identifier within that system are +# input parameters def get_forward_job(system, execution_id): + # connect to DB con = connect() cur = con.cursor() + + # repeatedly find the most recent jobs that's in the 'ready' + # state and try to claim it. this should never take more than a + # few iterations since there are (at least for now) at most 8 + # concurrent processes doing forward-processing for i in range(16): + # find the most recent 'ready' job cur.execute("select airs_date, airs_chunk " + "from jobs " + "where status = 'ready' " + @@ -14,16 +30,29 @@ def get_forward_job(system, execution_id): "limit 1") airs_date, airs_chunk = cur.fetchone() + # try and claim it if get_job(airs_date, airs_chunk, system, execution_id, con): return airs_date, airs_chunk + # again, this really shouldn't happen raise Exception('could not get a job to execute') +# claim the job with the given data and chunk number. system and +# execution_id are the same as for get_forward_job(). con can be +# set if a DB connection is already established def get_job(airs_date, airs_chunk, system, execution_id, con=None): + # connect to the DB if not already if con == None: con = connect() + # we avoid race conditions here by relying on PostgreSQL's "read + # committed" isolation level. the query below will block if + # another uncommitted transaction has already written this job + # to 'running'. if such a transaction commits, then our + # transaction will update no rows (since the status clause is + # no longer satisfied). if the other transaction is rolled back, + # then our transaction will update one row cur = con.cursor() cur.execute("update jobs set status = 'running' " + "where airs_date = '%s' and " % airs_date + @@ -32,6 +61,8 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): if cur.rowcount == 0: return False + # we got the claim. now add a row to the executions table to + # document our process start_time = get_time() cur.execute("insert into executions (airs_date, airs_chunk, " + "system, id, " + @@ -40,13 +71,16 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): "'%s', '%s'," % (system, execution_id) + "'%s')" % start_time) + # don't forgot to commit! con.commit() return True +# for marking a job as successfuly completed def job_done(airs_date, airs_chunk): job_done_or_failed(airs_date, airs_chunk, 'done') +# for marking a job as failed def job_failed(airs_date, airs_chunk): job_done_or_failed(airs_date, airs_chunk, 'failed') @@ -66,13 +100,20 @@ def get_time(): # either with success or failure def job_done_or_failed(airs_date, airs_chunk, status): - stop_time = get_time() + # connect to the DB con = connect() cur = con.cursor() + + # record the end-time of this execution + stop_time = get_time() cur.execute("update executions set stop_time = '%s' " % stop_time + "where airs_date = '%s' and " % airs_date + "airs_chunk = '%s'" % airs_chunk) + + # update the jobs table so this job is no longer marked 'running' cur.execute("update jobs set status = '%s' " % status + "where airs_date = '%s' and " % airs_date + "airs_chunk = %s" % airs_chunk) + + # don't forget to commit! con.commit() diff --git a/make_jobs.sql b/make_jobs.sql index 91b92496892db43ac1955040d75a197790611fd8..51b690a1352a0003f956906eff8284bac29860af 100644 --- a/make_jobs.sql +++ b/make_jobs.sql @@ -1,3 +1,8 @@ + +-- SQL to add jobs to the InterCal processing DB for all AIRS +-- granules up through the current day. this should be run daily to +-- keep the jobs table current + insert into jobs select '2003-01-01'::date + s1.i, s2.i, 'held' from generate_series(0, current_date - '2003-01-01') as s1(i), diff --git a/modis2airs.py b/modis2airs.py index 2e93460725e888fe1f56cb0af38f270bcb1000d4..a02576b1b02ab95413935210aadf0c946795d671 100644 --- a/modis2airs.py +++ b/modis2airs.py @@ -1,3 +1,7 @@ + +# first stage of bringing MODIS radiances into terms of AIRS spatial +# footprints (after collocation) + import numpy import sys diff --git a/modis2airs_collect.py b/modis2airs_collect.py index ee1d863584e65036cdbec88673470067977889b0..7262c9401f2f1d982f9bb9c5b035894b9f7c35a2 100644 --- a/modis2airs_collect.py +++ b/modis2airs_collect.py @@ -1,3 +1,7 @@ + +# final stage of bringing MODIS radiances into terms of AIRS spatial +# footprints + import numpy import sys diff --git a/modis_bright.py b/modis_bright.py index 0d0d3451a88665ee7f6e21cd26caf447e9f270db..a270c029add65239248111c271905a671bbef457 100644 --- a/modis_bright.py +++ b/modis_bright.py @@ -1,3 +1,4 @@ + # Compute MODIS brightness temperature from radiance. So far, this ONLY # applies to Aqua diff --git a/modis_l1b.py b/modis_l1b.py index 794bdbb88879c0e3280722ec9fdf8be91abc1e60..949504a17778b743abe65780ee133f986337c9b0 100644 --- a/modis_l1b.py +++ b/modis_l1b.py @@ -1,3 +1,6 @@ + +# utilities for processing MODIS L1A files into L1B + import PEATE_lib_v3 import calendar import glob diff --git a/ready_watch.py b/ready_watch.py index 23e1111616421782595b5ca1330d33c3c8b53da1..1924c975f892905ce402e912d29e2c7097a004d0 100644 --- a/ready_watch.py +++ b/ready_watch.py @@ -1,13 +1,24 @@ + +# script to check Steve's database for InterCal "jobs" (groups of +# 10 AIRS granules) that are ready for processing. besides the AIRS +# L1B files themselves, we also need MODIS L1A files and the attitude +# and ephemeris files needed to process them into L1B. this needs to +# be run frequently to keep the database current + import calendar import numpy import pgdb import time +# helper for parsing PostgreSQL dates def parse_time(time_str): return calendar.timegm(time.strptime(time_str, '%Y-%m-%d')) +# connect to Steve's database con = pgdb.connect(host='twister', database='PEATE_v3', password='abc') cur = con.cursor() + +# get times of all needed files from the DB, sorted cur.execute('select unix_t from files where type < 4 order by unix_t') mod_times = numpy.array(cur.fetchall()).flatten() cur.execute('select unix_t from anc_files where type = 8 order by unix_t') @@ -15,13 +26,18 @@ att_times = numpy.array(cur.fetchall()).flatten() cur.execute('select unix_t from anc_files where type = 9 order by unix_t') eph_times = numpy.array(cur.fetchall()).flatten() +# connect to the InterCal processing DB con = pgdb.connect(database='intercal') cur = con.cursor() +# determine the current range of AIRS granules to consider cur.execute('select min(airs_date), max(airs_date) from jobs') min_date, max_date = (parse_time(s) for s in cur.fetchone()) num_days = ((max_date - min_date) / 86400) + 1 +# create masks of which MODIS, attitude, and ephemeris files are +# available + offsets = (mod_times - min_date) / (5 * 60) mod_mask = numpy.zeros((num_days * 288,), numpy.bool) mod_mask[offsets] = True @@ -34,18 +50,28 @@ offsets = (eph_times - min_date + (12 * 60 * 60)) / (24 * 60 * 60) eph_mask = numpy.zeros((num_days,), numpy.bool) eph_mask[offsets] = True +# now loop through InterCal jobs labeled 'held' and for each check +# if all its needed dependencies are available. note that the code +# below is dependent on the fact that we are processing the AIRS +# granules an hour at a time ready_list = [] cur.execute("select airs_date, airs_chunk from jobs where status = 'held'") for airs_date, airs_chunk in cur: + # determine the nominal time for this AIRS granule airs_time = parse_time(airs_date) + (airs_chunk * 60 * 60) + # we need the 14 MODIS L1A files starting with the same nominal + # time as the first AIRS file (note that AIRS nominal times are + # about 5.5 minutes late compared to MODIS) num_mod_files = 14 mod_offset_ini = (airs_time - min_date) / (5 * 60) mod_offset_fin = mod_offset_ini + num_mod_files if not mod_mask[mod_offset_ini:mod_offset_fin].all(): continue + # unless the MODIS files cross a two-hour boundary, they all fit + # between 2 attitude files. otherwise, we'll need 3 if (airs_chunk % 2) == 1: num_att_files = 3 else: @@ -55,6 +81,8 @@ for airs_date, airs_chunk in cur: if not att_mask[att_offset_ini:att_offset_fin].all(): continue + # unless the MODIS files cross noon UTC, the all fit between 2 + # ephemeris files. otherwise, we'll need 3 if airs_chunk == 11: num_eph_files = 3 else: @@ -64,8 +92,11 @@ for airs_date, airs_chunk in cur: if not eph_mask[eph_offset_ini:eph_offset_fin].all(): continue + # if we get here, the job's ready ready_list.append((airs_date, airs_chunk)) +# update the InterCal processing DB with the jobs we now consider +# ready cur.executemany("update jobs set status = 'ready' " + "where airs_date = %s and airs_chunk = %s", ready_list) diff --git a/setup.sql b/setup.sql index 6651b86ccb2611c6b06553b1898e6828018317a3..6e2dcc5b368aca818cca907c8467d554f1b62b0e 100644 --- a/setup.sql +++ b/setup.sql @@ -1,4 +1,14 @@ +-- SQL to initialize the InterCal processing database + +-- the jobs table contains one row for every hour of AIRS data. new +-- jobs are added via make_jobs.sql. when jobs are first added they +-- are placed in the 'held' status. when the ready_watch.py script +-- determines that all needed data is available, the job is moved to +-- 'ready'. it can then be claimed (set 'running') by a process in +-- the cluster or on funnel. after processing, it is set to 'failed' +-- or 'done' depending on the result. these later 3 stages are +-- managed via the intercal_db.py module create domain status text check (value = 'held' or value = 'ready' or value = 'running' or @@ -11,6 +21,8 @@ create table jobs ( primary key (airs_date, airs_chunk) ); +-- each time a process executes an InterCal job, it records an entry +-- in the executions table for posterity create domain system text check (value = 'funnel' or value = 'peate'); create table executions ( airs_date date, diff --git a/util.py b/util.py index a9046d6c27f2fcb96c867578672ef76e8de5f4dc..e2071d11f19a219a528d9344c4a5d5dd323ae3a7 100644 --- a/util.py +++ b/util.py @@ -1,3 +1,7 @@ + +# so far this generic-sounding util module only has the useful +# HdfWriter helper class (FIXME: rename?) + import numpy from pyhdf.SD import SD, SDC diff --git a/wrapper_funnel.py b/wrapper_funnel.py index 7930aaee82163ec8cb44ab8b0235cfc3bd4fa776..3098929378dc54d9ffe0a24afeadcc3f3ac9cee0 100644 --- a/wrapper_funnel.py +++ b/wrapper_funnel.py @@ -1,3 +1,8 @@ + +# InterCal processing wrapper for conducting forward processing on +# funnel. this script is intended to be run under Condor (using the +# submit file intercal.condor) + import intercal_db import os import subprocess