Skip to content
Snippets Groups Projects
Commit 08aef491 authored by Levi Pfantz's avatar Levi Pfantz
Browse files

Add forward processing scripts

parent 8c85ad8f
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
#On a machine with satbuf 1-3 mounted all you should need to do is set lc_exec_path and run this file in an enviorment with pika installed. (Assuming it's in the same dir as amqpfind.py)
import warnings
warnings.filterwarnings(action="ignore", message=r"datetime.datetime.utcnow")
import pathlib
import subprocess
from concurrent import futures
import logging
import re
import os
import glob
import time
import traceback
logger = logging.getLogger(__name__)
realtime=True
logs_dir="py_logs_realtime"
base_outputdir="GGGLM-forward-realtime"
gglm_exec_path="/data/users/lpfantz/gglm_realtime/cspp-geo-gridded-glm-1.1/bin/cspp-geo-gglm.sh"
non_realtime_reg_patt_match = r'LCFA_G1[6-9]_s[0-9]{11}400_e'
non_realtime_reg_patt_extract = r'.*LCFA_G1[6-9]_s[0-9]{11}'
time_reg_patt=r'(?<=LCFA_G1[6-9]_s)[0-9]{11}(?=[0-9]{3}\_e)'
valid_glm_patt=r'\/glm\/L2\/LCFA\/OR_GLM-L2-LCFA'
def create_new_logger(id_str):
if not os.path.exists(logs_dir):
os.makedirs(logs_dir)
logpath = f"{logs_dir}/{id_str}.log"
local_logger = logging.getLogger(id_str)
local_logger.setLevel(logging.INFO)
ch = logging.FileHandler(logpath)
ch.setFormatter(logging.Formatter('%(asctime)s, %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
local_logger.addHandler(ch)
return local_logger
def log_output(process, logger_to_use, prefix_str=""):
if prefix_str:
logger_to_use.info(prefix_str)
if process.stdout:
for line in process.stdout.decode().splitlines():
logger_to_use.info(line)
if process.stderr:
for line in process.stderr.decode().splitlines():
logger_to_use.error(line)
ingest_logger=create_new_logger("ingest")
ingest_all_logger=create_new_logger("ingest_all")
time_pass_logger=create_new_logger("time_pass")
forward_proc_logger=create_new_logger("forward_proc")
errors_logger=create_new_logger("errors")
usr_bin_time_logger=create_new_logger("usr_bin_time")
def run(path, id_stamp):
try:
args = ['/usr/bin/time', gglm_exec_path, "--create-tiles", "-o", f"{base_outputdir}/{id_stamp}"]
if realtime:
args.append('-r')
if isinstance(path, list):
args.extend(path)
else:
args.append(path)
start_time = time.time()
proc = subprocess.run(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
execution_time=time.time() - start_time
exit_code=proc.returncode
time_pass_logger.info(f"{path}, {execution_time}, {exit_code}")
log_output(proc, usr_bin_time_logger, prefix_str=f"\n\n{path}")
if exit_code != 0:
log_output(proc, errors_logger)
except Exception as e:
forward_proc_logger.info(str(e))
tb_str = traceback.format_exc()
forward_proc_logger.info(tb_str)
forward_proc_logger.info("------------------------------------------------------------")
def process(proc):
try:
while True:
data = proc.stdout.readline()
ingest_all_logger.info(data.decode())
if data:
prefix, data = data.decode().strip().split()
if not re.search(valid_glm_patt, data):
continue
if data.startswith("/data"):
data=data[1:]
prefix=prefix.split('.')[0]
data=f"/{prefix}_{data}"
ingest_logger.info(data)
id_stamp=re.search(time_reg_patt, data)
id_stamp=id_stamp.group()
if 'G16' in data:
id_stamp="G16/"+id_stamp
else:
id_stamp="G18/"+id_stamp
id_stamp+=data.split('/', 2)[1]
if realtime:
run(data, id_stamp)
elif re.search(non_realtime_reg_patt_match, data):
match=re.search(non_realtime_reg_patt_extract, data)
str_to_run=match.group()+"*"
files_to_run=glob.glob(str_to_run)
run(files_to_run, id_stamp)
except Exception as e:
logger.warning(e)
if __name__ == '__main__':
logging_format = '[%(levelname)s: %(asctime)s : %(name)s] %(message)s'
logging.basicConfig(format=logging_format, datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO, handlers=[
logging.StreamHandler()
])
argsG16 = [
"python amqpfind.py -H mq2.ssec.wisc.edu -u sdsuser -p sdsmq -H mq1.ssec.wisc.edu -u sdsuser -p sdsmq -X satellite -w 10000. -C 'geo.goes.g16.glm.*.*.*.*.*' -j '{server_ip} {path}' -k '(coverage, start_time)'"]
argsG18 = [
"python amqpfind.py -H mq2.ssec.wisc.edu -u sdsuser -p sdsmq -H mq1.ssec.wisc.edu -u sdsuser -p sdsmq -X satellite -w 10000. -C 'geo.goes.g18.glm.*.*.*.*.*' -j '{server_ip} {path}' -k '(coverage, start_time)'"]
procG16 = subprocess.Popen(argsG16, shell=True, stdout=subprocess.PIPE)
procG18 = subprocess.Popen(argsG18, shell=True, stdout=subprocess.PIPE)
executor = futures.ThreadPoolExecutor(max_workers=2)
thread_pools = (executor.submit(process, procG16), executor.submit(process, procG18))
futures.wait(thread_pools)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment