Skip to content
Snippets Groups Projects
Commit 606abe5c authored by Greg Quinn's avatar Greg Quinn
Browse files

Improvements in code documentation

parent d6005c50
No related branches found
No related tags found
No related merge requests found
# script for producing AIRS radiances in terms of MODIS spectral
# response functions
import calendar
import flags
import numpy
......
# This script uses an AIRS channel properties files and a MODIS
# spectral response file to produce an intermediate airs2modis.hdf
# file. This intermediate file is then used as input by the
......
# some utilities for dealing with AIRS granules
import PEATE_lib_v3
import glob
import pgdb
......
# General brightness temperature routines
import numpy
......
# script for conducting InterCal processing for a set of contiguous
# AIRS granules
import airs_l1b
import calendar
import modis_l1b
......@@ -8,6 +12,7 @@ import time
from pyhdf.SD import SD
# function for processing a single AIRS granule
def process(airs_date, airs_gran_num):
# ensure we have the AIRS file
......
# Module with values for flags output in the intercal system
# Module with values for flags output in the InterCal system
INVALID = 1
LOW = 2
......
# pulls information out of an AIRS granule that might be useful in
# analyzing InterCal results
import sys
from pyhdf.SD import SD, SDC
......
# submit file for InterCal forward processing on funnel
universe = vanilla
initial_dir = /data/gregq/condor/submit
......
# utilities for interacting with the InterCal processing database
import pgdb
import time
# claim a job for forward processing. claiming a job just sets its
# status as 'running' in the DB, but it is done in such a way as to
# ensure that only one process can claim a given job. returns an
# (airs_date, airs_chunk) pair. the processing system name ('funnel'
# or 'peate' for starters) and an identifier within that system are
# input parameters
def get_forward_job(system, execution_id):
# connect to DB
con = connect()
cur = con.cursor()
# repeatedly find the most recent jobs that's in the 'ready'
# state and try to claim it. this should never take more than a
# few iterations since there are (at least for now) at most 8
# concurrent processes doing forward-processing
for i in range(16):
# find the most recent 'ready' job
cur.execute("select airs_date, airs_chunk " +
"from jobs " +
"where status = 'ready' " +
......@@ -14,16 +30,29 @@ def get_forward_job(system, execution_id):
"limit 1")
airs_date, airs_chunk = cur.fetchone()
# try and claim it
if get_job(airs_date, airs_chunk, system, execution_id, con):
return airs_date, airs_chunk
# again, this really shouldn't happen
raise Exception('could not get a job to execute')
# claim the job with the given data and chunk number. system and
# execution_id are the same as for get_forward_job(). con can be
# set if a DB connection is already established
def get_job(airs_date, airs_chunk, system, execution_id, con=None):
# connect to the DB if not already
if con == None:
con = connect()
# we avoid race conditions here by relying on PostgreSQL's "read
# committed" isolation level. the query below will block if
# another uncommitted transaction has already written this job
# to 'running'. if such a transaction commits, then our
# transaction will update no rows (since the status clause is
# no longer satisfied). if the other transaction is rolled back,
# then our transaction will update one row
cur = con.cursor()
cur.execute("update jobs set status = 'running' " +
"where airs_date = '%s' and " % airs_date +
......@@ -32,6 +61,8 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None):
if cur.rowcount == 0:
return False
# we got the claim. now add a row to the executions table to
# document our process
start_time = get_time()
cur.execute("insert into executions (airs_date, airs_chunk, " +
"system, id, " +
......@@ -40,13 +71,16 @@ def get_job(airs_date, airs_chunk, system, execution_id, con=None):
"'%s', '%s'," % (system, execution_id) +
"'%s')" % start_time)
# don't forgot to commit!
con.commit()
return True
# for marking a job as successfuly completed
def job_done(airs_date, airs_chunk):
job_done_or_failed(airs_date, airs_chunk, 'done')
# for marking a job as failed
def job_failed(airs_date, airs_chunk):
job_done_or_failed(airs_date, airs_chunk, 'failed')
......@@ -66,13 +100,20 @@ def get_time():
# either with success or failure
def job_done_or_failed(airs_date, airs_chunk, status):
stop_time = get_time()
# connect to the DB
con = connect()
cur = con.cursor()
# record the end-time of this execution
stop_time = get_time()
cur.execute("update executions set stop_time = '%s' " % stop_time +
"where airs_date = '%s' and " % airs_date +
"airs_chunk = '%s'" % airs_chunk)
# update the jobs table so this job is no longer marked 'running'
cur.execute("update jobs set status = '%s' " % status +
"where airs_date = '%s' and " % airs_date +
"airs_chunk = %s" % airs_chunk)
# don't forget to commit!
con.commit()
-- SQL to add jobs to the InterCal processing DB for all AIRS
-- granules up through the current day. this should be run daily to
-- keep the jobs table current
insert into jobs
select '2003-01-01'::date + s1.i, s2.i, 'held'
from generate_series(0, current_date - '2003-01-01') as s1(i),
......
# first stage of bringing MODIS radiances into terms of AIRS spatial
# footprints (after collocation)
import numpy
import sys
......
# final stage of bringing MODIS radiances into terms of AIRS spatial
# footprints
import numpy
import sys
......
# Compute MODIS brightness temperature from radiance. So far, this ONLY
# applies to Aqua
......
# utilities for processing MODIS L1A files into L1B
import PEATE_lib_v3
import calendar
import glob
......
# script to check Steve's database for InterCal "jobs" (groups of
# 10 AIRS granules) that are ready for processing. besides the AIRS
# L1B files themselves, we also need MODIS L1A files and the attitude
# and ephemeris files needed to process them into L1B. this needs to
# be run frequently to keep the database current
import calendar
import numpy
import pgdb
import time
# helper for parsing PostgreSQL dates
def parse_time(time_str):
return calendar.timegm(time.strptime(time_str, '%Y-%m-%d'))
# connect to Steve's database
con = pgdb.connect(host='twister', database='PEATE_v3', password='abc')
cur = con.cursor()
# get times of all needed files from the DB, sorted
cur.execute('select unix_t from files where type < 4 order by unix_t')
mod_times = numpy.array(cur.fetchall()).flatten()
cur.execute('select unix_t from anc_files where type = 8 order by unix_t')
......@@ -15,13 +26,18 @@ att_times = numpy.array(cur.fetchall()).flatten()
cur.execute('select unix_t from anc_files where type = 9 order by unix_t')
eph_times = numpy.array(cur.fetchall()).flatten()
# connect to the InterCal processing DB
con = pgdb.connect(database='intercal')
cur = con.cursor()
# determine the current range of AIRS granules to consider
cur.execute('select min(airs_date), max(airs_date) from jobs')
min_date, max_date = (parse_time(s) for s in cur.fetchone())
num_days = ((max_date - min_date) / 86400) + 1
# create masks of which MODIS, attitude, and ephemeris files are
# available
offsets = (mod_times - min_date) / (5 * 60)
mod_mask = numpy.zeros((num_days * 288,), numpy.bool)
mod_mask[offsets] = True
......@@ -34,18 +50,28 @@ offsets = (eph_times - min_date + (12 * 60 * 60)) / (24 * 60 * 60)
eph_mask = numpy.zeros((num_days,), numpy.bool)
eph_mask[offsets] = True
# now loop through InterCal jobs labeled 'held' and for each check
# if all its needed dependencies are available. note that the code
# below is dependent on the fact that we are processing the AIRS
# granules an hour at a time
ready_list = []
cur.execute("select airs_date, airs_chunk from jobs where status = 'held'")
for airs_date, airs_chunk in cur:
# determine the nominal time for this AIRS granule
airs_time = parse_time(airs_date) + (airs_chunk * 60 * 60)
# we need the 14 MODIS L1A files starting with the same nominal
# time as the first AIRS file (note that AIRS nominal times are
# about 5.5 minutes late compared to MODIS)
num_mod_files = 14
mod_offset_ini = (airs_time - min_date) / (5 * 60)
mod_offset_fin = mod_offset_ini + num_mod_files
if not mod_mask[mod_offset_ini:mod_offset_fin].all():
continue
# unless the MODIS files cross a two-hour boundary, they all fit
# between 2 attitude files. otherwise, we'll need 3
if (airs_chunk % 2) == 1:
num_att_files = 3
else:
......@@ -55,6 +81,8 @@ for airs_date, airs_chunk in cur:
if not att_mask[att_offset_ini:att_offset_fin].all():
continue
# unless the MODIS files cross noon UTC, the all fit between 2
# ephemeris files. otherwise, we'll need 3
if airs_chunk == 11:
num_eph_files = 3
else:
......@@ -64,8 +92,11 @@ for airs_date, airs_chunk in cur:
if not eph_mask[eph_offset_ini:eph_offset_fin].all():
continue
# if we get here, the job's ready
ready_list.append((airs_date, airs_chunk))
# update the InterCal processing DB with the jobs we now consider
# ready
cur.executemany("update jobs set status = 'ready' " +
"where airs_date = %s and airs_chunk = %s",
ready_list)
......
-- SQL to initialize the InterCal processing database
-- the jobs table contains one row for every hour of AIRS data. new
-- jobs are added via make_jobs.sql. when jobs are first added they
-- are placed in the 'held' status. when the ready_watch.py script
-- determines that all needed data is available, the job is moved to
-- 'ready'. it can then be claimed (set 'running') by a process in
-- the cluster or on funnel. after processing, it is set to 'failed'
-- or 'done' depending on the result. these later 3 stages are
-- managed via the intercal_db.py module
create domain status text check (value = 'held' or
value = 'ready' or
value = 'running' or
......@@ -11,6 +21,8 @@ create table jobs (
primary key (airs_date, airs_chunk)
);
-- each time a process executes an InterCal job, it records an entry
-- in the executions table for posterity
create domain system text check (value = 'funnel' or value = 'peate');
create table executions (
airs_date date,
......
# so far this generic-sounding util module only has the useful
# HdfWriter helper class (FIXME: rename?)
import numpy
from pyhdf.SD import SD, SDC
......
# InterCal processing wrapper for conducting forward processing on
# funnel. this script is intended to be run under Condor (using the
# submit file intercal.condor)
import intercal_db
import os
import subprocess
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment