From 606abe5c019bdff6d5fb7e9e2bc2b7187fab21fe Mon Sep 17 00:00:00 2001 From: Greg Quinn <greg.quinn@ssec.wisc.edu> Date: Tue, 13 Oct 2009 02:32:38 +0000 Subject: [PATCH] Improvements in code documentation --- airs2modis.py | 4 ++++ airs2modis_pre.py | 1 + airs_l1b.py | 3 +++ bright.py | 1 + driver.py | 5 +++++ flags.py | 3 ++- get_meta.py | 4 ++++ intercal.condor | 3 +++ intercal_db.py | 43 ++++++++++++++++++++++++++++++++++++++++++- make_jobs.sql | 5 +++++ modis2airs.py | 4 ++++ modis2airs_collect.py | 4 ++++ modis_bright.py | 1 + modis_l1b.py | 3 +++ ready_watch.py | 31 +++++++++++++++++++++++++++++++ setup.sql | 12 ++++++++++++ util.py | 4 ++++ wrapper_funnel.py | 5 +++++ 18 files changed, 134 insertions(+), 2 deletions(-) diff --git a/airs2modis.py b/airs2modis.py index 53c26a6..7e05ca1 100644 --- a/airs2modis.py +++ b/airs2modis.py @@ -1,3 +1,7 @@ + +# script for producing AIRS radiances in terms of MODIS spectral +# response functions + import calendar import flags import numpy diff --git a/airs2modis_pre.py b/airs2modis_pre.py index 39dcda8..beaeb54 100644 --- a/airs2modis_pre.py +++ b/airs2modis_pre.py @@ -1,3 +1,4 @@ + # This script uses an AIRS channel properties files and a MODIS # spectral response file to produce an intermediate airs2modis.hdf # file. This intermediate file is then used as input by the diff --git a/airs_l1b.py b/airs_l1b.py index 32b74fa..294a74f 100644 --- a/airs_l1b.py +++ b/airs_l1b.py @@ -1,3 +1,6 @@ + +# some utilities for dealing with AIRS granules + import PEATE_lib_v3 import glob import pgdb diff --git a/bright.py b/bright.py index 76a19f7..97b9fa7 100644 --- a/bright.py +++ b/bright.py @@ -1,3 +1,4 @@ + # General brightness temperature routines import numpy diff --git a/driver.py b/driver.py index 83bc35a..92b4b50 100644 --- a/driver.py +++ b/driver.py @@ -1,3 +1,7 @@ + +# script for conducting InterCal processing for a set of contiguous +# AIRS granules + import airs_l1b import calendar import modis_l1b @@ -8,6 +12,7 @@ import time from pyhdf.SD import SD +# function for processing a single AIRS granule def process(airs_date, airs_gran_num): # ensure we have the AIRS file diff --git a/flags.py b/flags.py index 0da9372..35e1f96 100644 --- a/flags.py +++ b/flags.py @@ -1,4 +1,5 @@ -# Module with values for flags output in the intercal system + +# Module with values for flags output in the InterCal system INVALID = 1 LOW = 2 diff --git a/get_meta.py b/get_meta.py index d8c5c10..5243fe1 100644 --- a/get_meta.py +++ b/get_meta.py @@ -1,3 +1,7 @@ + +# pulls information out of an AIRS granule that might be useful in +# analyzing InterCal results + import sys from pyhdf.SD import SD, SDC diff --git a/intercal.condor b/intercal.condor index 9bf10af..15157ad 100644 --- a/intercal.condor +++ b/intercal.condor @@ -1,3 +1,6 @@ + +# submit file for InterCal forward processing on funnel + universe = vanilla initial_dir = /data/gregq/condor/submit diff --git a/intercal_db.py b/intercal_db.py index 1194f3b..9c49733 100644 --- a/intercal_db.py +++ b/intercal_db.py @@ -1,12 +1,28 @@ + +# utilities for interacting with the InterCal processing database + import pgdb import time +# claim a job for forward processing. claiming a job just sets its +# status as 'running' in the DB, but it is done in such a way as to +# ensure that only one process can claim a given job. returns an +# (airs_date, airs_chunk) pair. the processing system name ('funnel' +# or 'peate' for starters) and an identifier within that system are +# input parameters def get_forward_job(system, execution_id): + # connect to DB con = connect() cur = con.cursor() + + # repeatedly find the most recent jobs that's in the 'ready' + # state and try to claim it. this should never take more than a + # few iterations since there are (at least for now) at most 8 + # concurrent processes doing forward-processing for i in range(16): + # find the most recent 'ready' job cur.execute("select airs_date, airs_chunk " + "from jobs " + "where status = 'ready' " + @@ -14,16 +30,29 @@ def get_forward_job(system, execution_id): "limit 1") airs_date, airs_chunk = cur.fetchone() + # try and claim it if get_job(airs_date, airs_chunk, system, execution_id, con): return airs_date, airs_chunk + # again, this really shouldn't happen raise Exception('could not get a job to execute') +# claim the job with the given data and chunk number. system and +# execution_id are the same as for get_forward_job(). con can be +# set if a DB connection is already established def get_job(airs_date, airs_chunk, system, execution_id, con=None): + # connect to the DB if not already if con == None: con = connect() + # we avoid race conditions here by relying on PostgreSQL's "read + # committed" isolation level. the query below will block if + # another uncommitted transaction has already written this job + # to 'running'. if such a transaction commits, then our + # transaction will update no rows (since the status clause is + # no longer satisfied). if the other transaction is rolled back, + # then our transaction will update one row cur = con.cursor() cur.execute("update jobs set status = 'running' " + "where airs_date = '%s' and " % airs_date + @@ -32,6 +61,8 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): if cur.rowcount == 0: return False + # we got the claim. now add a row to the executions table to + # document our process start_time = get_time() cur.execute("insert into executions (airs_date, airs_chunk, " + "system, id, " + @@ -40,13 +71,16 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None): "'%s', '%s'," % (system, execution_id) + "'%s')" % start_time) + # don't forgot to commit! con.commit() return True +# for marking a job as successfuly completed def job_done(airs_date, airs_chunk): job_done_or_failed(airs_date, airs_chunk, 'done') +# for marking a job as failed def job_failed(airs_date, airs_chunk): job_done_or_failed(airs_date, airs_chunk, 'failed') @@ -66,13 +100,20 @@ def get_time(): # either with success or failure def job_done_or_failed(airs_date, airs_chunk, status): - stop_time = get_time() + # connect to the DB con = connect() cur = con.cursor() + + # record the end-time of this execution + stop_time = get_time() cur.execute("update executions set stop_time = '%s' " % stop_time + "where airs_date = '%s' and " % airs_date + "airs_chunk = '%s'" % airs_chunk) + + # update the jobs table so this job is no longer marked 'running' cur.execute("update jobs set status = '%s' " % status + "where airs_date = '%s' and " % airs_date + "airs_chunk = %s" % airs_chunk) + + # don't forget to commit! con.commit() diff --git a/make_jobs.sql b/make_jobs.sql index 91b9249..51b690a 100644 --- a/make_jobs.sql +++ b/make_jobs.sql @@ -1,3 +1,8 @@ + +-- SQL to add jobs to the InterCal processing DB for all AIRS +-- granules up through the current day. this should be run daily to +-- keep the jobs table current + insert into jobs select '2003-01-01'::date + s1.i, s2.i, 'held' from generate_series(0, current_date - '2003-01-01') as s1(i), diff --git a/modis2airs.py b/modis2airs.py index 2e93460..a02576b 100644 --- a/modis2airs.py +++ b/modis2airs.py @@ -1,3 +1,7 @@ + +# first stage of bringing MODIS radiances into terms of AIRS spatial +# footprints (after collocation) + import numpy import sys diff --git a/modis2airs_collect.py b/modis2airs_collect.py index ee1d863..7262c94 100644 --- a/modis2airs_collect.py +++ b/modis2airs_collect.py @@ -1,3 +1,7 @@ + +# final stage of bringing MODIS radiances into terms of AIRS spatial +# footprints + import numpy import sys diff --git a/modis_bright.py b/modis_bright.py index 0d0d345..a270c02 100644 --- a/modis_bright.py +++ b/modis_bright.py @@ -1,3 +1,4 @@ + # Compute MODIS brightness temperature from radiance. So far, this ONLY # applies to Aqua diff --git a/modis_l1b.py b/modis_l1b.py index 794bdbb..949504a 100644 --- a/modis_l1b.py +++ b/modis_l1b.py @@ -1,3 +1,6 @@ + +# utilities for processing MODIS L1A files into L1B + import PEATE_lib_v3 import calendar import glob diff --git a/ready_watch.py b/ready_watch.py index 23e1111..1924c97 100644 --- a/ready_watch.py +++ b/ready_watch.py @@ -1,13 +1,24 @@ + +# script to check Steve's database for InterCal "jobs" (groups of +# 10 AIRS granules) that are ready for processing. besides the AIRS +# L1B files themselves, we also need MODIS L1A files and the attitude +# and ephemeris files needed to process them into L1B. this needs to +# be run frequently to keep the database current + import calendar import numpy import pgdb import time +# helper for parsing PostgreSQL dates def parse_time(time_str): return calendar.timegm(time.strptime(time_str, '%Y-%m-%d')) +# connect to Steve's database con = pgdb.connect(host='twister', database='PEATE_v3', password='abc') cur = con.cursor() + +# get times of all needed files from the DB, sorted cur.execute('select unix_t from files where type < 4 order by unix_t') mod_times = numpy.array(cur.fetchall()).flatten() cur.execute('select unix_t from anc_files where type = 8 order by unix_t') @@ -15,13 +26,18 @@ att_times = numpy.array(cur.fetchall()).flatten() cur.execute('select unix_t from anc_files where type = 9 order by unix_t') eph_times = numpy.array(cur.fetchall()).flatten() +# connect to the InterCal processing DB con = pgdb.connect(database='intercal') cur = con.cursor() +# determine the current range of AIRS granules to consider cur.execute('select min(airs_date), max(airs_date) from jobs') min_date, max_date = (parse_time(s) for s in cur.fetchone()) num_days = ((max_date - min_date) / 86400) + 1 +# create masks of which MODIS, attitude, and ephemeris files are +# available + offsets = (mod_times - min_date) / (5 * 60) mod_mask = numpy.zeros((num_days * 288,), numpy.bool) mod_mask[offsets] = True @@ -34,18 +50,28 @@ offsets = (eph_times - min_date + (12 * 60 * 60)) / (24 * 60 * 60) eph_mask = numpy.zeros((num_days,), numpy.bool) eph_mask[offsets] = True +# now loop through InterCal jobs labeled 'held' and for each check +# if all its needed dependencies are available. note that the code +# below is dependent on the fact that we are processing the AIRS +# granules an hour at a time ready_list = [] cur.execute("select airs_date, airs_chunk from jobs where status = 'held'") for airs_date, airs_chunk in cur: + # determine the nominal time for this AIRS granule airs_time = parse_time(airs_date) + (airs_chunk * 60 * 60) + # we need the 14 MODIS L1A files starting with the same nominal + # time as the first AIRS file (note that AIRS nominal times are + # about 5.5 minutes late compared to MODIS) num_mod_files = 14 mod_offset_ini = (airs_time - min_date) / (5 * 60) mod_offset_fin = mod_offset_ini + num_mod_files if not mod_mask[mod_offset_ini:mod_offset_fin].all(): continue + # unless the MODIS files cross a two-hour boundary, they all fit + # between 2 attitude files. otherwise, we'll need 3 if (airs_chunk % 2) == 1: num_att_files = 3 else: @@ -55,6 +81,8 @@ for airs_date, airs_chunk in cur: if not att_mask[att_offset_ini:att_offset_fin].all(): continue + # unless the MODIS files cross noon UTC, the all fit between 2 + # ephemeris files. otherwise, we'll need 3 if airs_chunk == 11: num_eph_files = 3 else: @@ -64,8 +92,11 @@ for airs_date, airs_chunk in cur: if not eph_mask[eph_offset_ini:eph_offset_fin].all(): continue + # if we get here, the job's ready ready_list.append((airs_date, airs_chunk)) +# update the InterCal processing DB with the jobs we now consider +# ready cur.executemany("update jobs set status = 'ready' " + "where airs_date = %s and airs_chunk = %s", ready_list) diff --git a/setup.sql b/setup.sql index 6651b86..6e2dcc5 100644 --- a/setup.sql +++ b/setup.sql @@ -1,4 +1,14 @@ +-- SQL to initialize the InterCal processing database + +-- the jobs table contains one row for every hour of AIRS data. new +-- jobs are added via make_jobs.sql. when jobs are first added they +-- are placed in the 'held' status. when the ready_watch.py script +-- determines that all needed data is available, the job is moved to +-- 'ready'. it can then be claimed (set 'running') by a process in +-- the cluster or on funnel. after processing, it is set to 'failed' +-- or 'done' depending on the result. these later 3 stages are +-- managed via the intercal_db.py module create domain status text check (value = 'held' or value = 'ready' or value = 'running' or @@ -11,6 +21,8 @@ create table jobs ( primary key (airs_date, airs_chunk) ); +-- each time a process executes an InterCal job, it records an entry +-- in the executions table for posterity create domain system text check (value = 'funnel' or value = 'peate'); create table executions ( airs_date date, diff --git a/util.py b/util.py index a9046d6..e2071d1 100644 --- a/util.py +++ b/util.py @@ -1,3 +1,7 @@ + +# so far this generic-sounding util module only has the useful +# HdfWriter helper class (FIXME: rename?) + import numpy from pyhdf.SD import SD, SDC diff --git a/wrapper_funnel.py b/wrapper_funnel.py index 7930aae..3098929 100644 --- a/wrapper_funnel.py +++ b/wrapper_funnel.py @@ -1,3 +1,8 @@ + +# InterCal processing wrapper for conducting forward processing on +# funnel. this script is intended to be run under Condor (using the +# submit file intercal.condor) + import intercal_db import os import subprocess -- GitLab