# utilities for interacting with the InterCal processing database import pgdb import time # helper for connecting to the database def connect(): return pgdb.connect(database='intercal') # helper for getting the current UTC time in a PostgreSQL-friendly # format def get_time(): return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) # claim a job for forward processing. claiming a job just sets its # status as 'running' in the DB, but it is done in such a way as to # ensure that only one process can claim a given job. returns an # (airs_date, airs_chunk) pair. the processing system name ('funnel' # or 'peate' for starters) and an identifier within that system are # input parameters def get_forward_job(system, system_id): # connect to DB con = connect() cur = con.cursor() # repeatedly find the most recent jobs that's in the 'ready' # state and try to claim it. this should never take more than a # few iterations since there are (at least for now) at most 8 # concurrent processes doing forward-processing for i in range(16): # find the most recent 'ready' job cur.execute("select airs_date, airs_chunk " + "from intercal_jobs " + "where status = 'ready' " + "order by airs_date desc, airs_chunk desc " + "limit 1") airs_date, airs_chunk = cur.fetchone() # try and claim it if get_intercal_job(airs_date, airs_chunk, system, system_id, con): return airs_date, airs_chunk # again, this really shouldn't happen raise Exception('could not get a job to execute') # claim the job with the given data and chunk number. system and # system_id are the same as for get_forward_job(). con can be # set if a DB connection is already established def get_intercal_job(airs_date, airs_chunk, system, system_id, con=None): # connect to the DB if not already if con == None: con = connect() # we avoid race conditions here by relying on PostgreSQL's "read # committed" isolation level. the query below will block if # another uncommitted transaction has already written this job # to 'running'. if such a transaction commits, then our # transaction will update no rows (since the status clause is # no longer satisfied). if the other transaction is rolled back, # then our transaction will update one row cur = con.cursor() cur.execute("update intercal_jobs set status = 'running' " + "where airs_date = '%s' and " % airs_date + "airs_chunk = %s and " % airs_chunk + "status = 'ready'") if cur.rowcount == 0: return False # we got the claim. now add a row to the executions table to # document our process start_time = get_time() cur.execute("insert into intercal_executions (airs_date, airs_chunk, " + "system, id, " + "start_time) " + "values ('%s', %s, " % (airs_date, airs_chunk) + "'%s', '%s'," % (system, system_id) + "'%s')" % start_time) # don't forgot to commit! con.commit() return True # for marking a job as successfuly completed def intercal_done(airs_date, airs_chunk): intercal_done_or_failed(airs_date, airs_chunk, 'done') # for marking a job as failed def intercal_failed(airs_date, airs_chunk): intercal_done_or_failed(airs_date, airs_chunk, 'failed') # helper for updating the database once execution is completed, # either with success or failure def intercal_done_or_failed(airs_date, airs_chunk, status): # connect to the DB con = connect() cur = con.cursor() # record the end-time of this execution stop_time = get_time() cur.execute("update intercal_executions " + "set stop_time = '%s' " % stop_time + "where airs_date = '%s' and " % airs_date + "airs_chunk = '%s'" % airs_chunk) # update the jobs table so this job is no longer marked 'running' cur.execute("update intercal_jobs " + "set status = '%s' " % status + "where airs_date = '%s' and " % airs_date + "airs_chunk = %s" % airs_chunk) # don't forget to commit! con.commit() def start_groupers(airs_date, airs_chunk, system, system_id): # connect to DB con = connect() cur = con.cursor() # set all grouper jobs for this chunk to running cur.execute("update grouper_jobs set status = 'running' " + "where airs_date = '%s' and " % airs_date + "airs_chunk = %s and " % airs_chunk + "status = 'idle' " + "returning grouper_id") grouper_ids = [row[0] for row in cur] # create a grouper_execution row start_time = get_time() cur.execute("insert into grouper_executions (type, system, id, " + "start_time) " + "values ('forward', " + "'%s', '%s', " % (system,system_id) + "'%s') " % start_time + "returning execution_id") execution_id = cur.fetchone()[0] # create grouper execution mappings # FIXME: it's tricky to follow what's going on with the # format specifiers here cur.executemany("insert into grouper_execution_mappings (execution_id, " + "airs_date, " + "airs_chunk, " + "grouper_id) " + "values (%s, " % execution_id + "'%s', %s, " % (airs_date, airs_chunk) + "%s)", ((grouper_id,) for grouper_id in grouper_ids)) # updates are done now, so commit con.commit() # get the names of the groupers so we can return them # FIXME: better way to do this? grouper_names = [] for grouper_id in grouper_ids: cur.execute("select name from groupers " + "where grouper_id = %s" % grouper_id) grouper_names.append(cur.fetchone()[0]) return execution_id, grouper_ids, grouper_names def groupers_done(airs_date, airs_chunk, execution_id, grouper_ids): groupers_done_or_failed(airs_date, airs_chunk, execution_id, grouper_ids, 'done') def groupers_failed(airs_date, airs_chunk, execution_id, grouper_ids): groupers_done_or_failed(airs_date, airs_chunk, execution_id, grouper_ids, 'failed') def groupers_done_or_failed(airs_date, airs_chunk, execution_id, grouper_ids, status): # connect to the DB con = connect() cur = con.cursor() # record the end-time of this execution stop_time = get_time() cur.execute("update grouper_executions " + "set stop_time = '%s' " % stop_time + "where execution_id = %s" % execution_id) # update the grouper_jobs table so these grouper jobs are no # longer marked 'running' # FIXME: it's tricky to follow what's going on with the # format specifiers here cur.executemany("update grouper_jobs " + "set status = '%s' " % status + "where airs_date = '%s' and " % airs_date + "airs_chunk = %s and " % airs_chunk + "grouper_id = %s", ((grouper_id,) for grouper_id in grouper_ids)) # don't forget to commit! con.commit()