|
|
|
|
|
|
|
|
### Deploy the binary package to the user account cluster
|
|
|
We import the `flo3` interface python code for `fusion_matlab` into the software tree
|
|
|
`/mnt/software/geoffc` by running `rsync`...
|
|
|
```bash
|
|
|
sudo su - flo
|
|
|
cd /mnt/software/geoffc
|
|
|
mv fusion_matlab fusion_matlab_old
|
|
|
rsync -urLv /home/geoffc/code/PeateScience/local/dist/fusion_matlab . --progress --exclude=.*.sw*
|
|
|
```
|
|
|
### Deploy the glue code to the development (flo) account cluster
|
|
|
We import the `flo3` interface python code for `fusion_matlab` into the software tree
|
|
|
`/mnt/software/flo` by changing to the `flo` account and running `rsync`...
|
|
|
```bash
|
|
|
sudo su - flo
|
|
|
cd /mnt/software/flo/
|
|
|
mv fusion_matlab fusion_matlab_old
|
|
|
rsync -urLv /home/geoffc/code/PeateScience/local/dist/fusion_matlab . --progress --exclude=.*.sw*
|
|
|
```
|
|
|
|
|
|
### Commit glue code to PeateScience repo
|
|
|
The actual glue code was copied to `/mnt/software` in the last step, but pushing the `fusion_matlab` python code to the `PeateScience` repo will provide the submission scripts
|
|
|
`example_local_prepare.py` and `submit_fusion_matlab.py` for use on condor.
|
|
|
```bash
|
|
|
cd ~/code/PeateScience
|
|
|
git pull
|
|
|
git add ~/code/PeateScience/packages/fusion_matlab
|
|
|
git commit fusion_matlab -m "Initial commit of the fusion_matlab package."
|
|
|
git push
|
|
|
```
|
|
|
|
|
|
## Running the Fusion code on the cluster
|
|
|
We can now submit `fusion_matlab` to the cluster from condor, on the development (`flo`) account:
|
|
|
```bash
|
|
|
sudo su - flo
|
|
|
cd /home/geoffc/fusion_matlab/work/
|
|
|
|
|
|
$ python /home/geoffc/code/PeateScience/packages/fusion_matlab/submit_fusion_matlab.py
|
|
|
(INFO):submit_fusion_matlab.py:<module>:30: Submitting intervals...
|
|
|
(INFO):submit_fusion_matlab.py:<module>:32: Submitting interval 2015-04-17 14:36:00 -> 2015-04-17 14:36:59
|
|
|
(INFO):submit_fusion_matlab.py:<module>:36: There are 1 contexts in this interval
|
|
|
{'satellite': 'snpp', 'version': '1.0dev0', 'granule': datetime.datetime(2015, 4, 17, 14, 36)}
|
|
|
(INFO):submit_fusion_matlab.py:<module>:42: First context: {'satellite': 'snpp', 'version': '1.0dev0', 'granule': datetime.datetime(2015, 4, 17, 14, 36)}
|
|
|
(INFO):submit_fusion_matlab.py:<module>:43: Last context: {'satellite': 'snpp', 'version': '1.0dev0', 'granule': datetime.datetime(2015, 4, 17, 14, 36)}
|
|
|
(INFO):submit_fusion_matlab.py:<module>:44: xrange(86694864, 86694865)
|
|
|
```
|
|
|
We can keep track of running jobs by doing the various incantations:
|
|
|
```bash
|
|
|
sudo su - flo
|
|
|
condor_q -autoformat FloClusterComputations | sort | uniq -c
|
|
|
condor_q -constraint 'FloClusterComputations=="flo.sw.fusion_matlab:FUSION_MATLAB"' -constraint 'Owner=="flo"'
|
|
|
condor_q -autoformat FloClusterComputations Owner ClusterID ProcID
|
|
|
condor_q -format '%d' ClusterId -format '.%d\n' ProcId
|
|
|
condor_q -constraint 'FloClusterComputations=="flo.sw.fusion_matlab:FUSION_MATLAB"' -format '%d' ClusterId -format '.%d\n' ProcId
|
|
|
```
|
|
|
To look at the log files of a particular job(s)
|
|
|
```python
|
|
|
run -e /home/geoffc/git/sips_utils/snippets.py
|
|
|
job_range = (86694864, 86694865)
|
|
|
job_file_branches = [job_number_to_dir('/scratch/flo/jobs',job) for job in range(*job_range)]
|
|
|
if len(job_file_branches)>1:
|
|
|
job_stdout_files = list(np.squeeze([glob(dir+'-stdout') for dir in job_file_branches]))
|
|
|
job_stderr_files = list(np.squeeze([glob(dir+'-stderr') for dir in job_file_branches]))
|
|
|
else:
|
|
|
job_stdout_files = list([glob(dir+'-stdout') for dir in job_file_branches][0])
|
|
|
job_stderr_files = list([glob(dir+'-stderr') for dir in job_file_branches][0])
|
|
|
```
|
|
|
In order to check the database for the fusion matlab output
|
|
|
```sql
|
|
|
flo_user="-d postgresql://flo3@ratchet.sips/flo3"
|
|
|
> psql $flo_user -c "SELECT job,size,output,context,file_name from stored_products where computation='flo.sw.fusion_matlab:FUSION_MATLAB' and output='fused_l1b' order by file_name;"
|
|
|
job | size | output | context | file_name
|
|
|
----------+-----------+-----------+-------------------------------------------------------------------------------------------------------+-------------------------------------------------
|
|
|
91073252 | 366596456 | fused_l1b | "granule"=>"datetime.datetime(2015, 4, 18, 6, 6)", "version"=>"'1.0dev0'", "satellite"=>"'snpp'" | VNP02FSN.A2015108.0606.001.2018025180544.nc
|
|
|
(1 row)
|
|
|
```
|
|
|
|
|
|
To group granules by day/month etc...
|
|
|
```sql
|
|
|
psql $flo_user -c "SELECT date_trunc('months',pydt(context->'granule')) as m,count(*) from stored_products where computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' group by m order by m" | less
|
|
|
```
|
|
|
To select granules which match or are between certain dates:
|
|
|
```sql
|
|
|
psql $flo_user -c "SELECT job,size,context,file_name from stored_products where computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' and date_trunc('days',pydt(context->'granule'))='2015-01-01' order by file_name;" | less
|
|
|
|
|
|
psql $flo_user -c "SELECT job,size,context,file_name from stored_products where computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' and date_trunc('days',pydt(context->'granule'))>'2015-01-01' and date_trunc('days',pydt(context->'granule'))<'2015-01-03' order by file_name;" | less
|
|
|
```
|
|
|
To remove old files:
|
|
|
```sql
|
|
|
psql $flo_user -c "SELECT job, size, context, file_name FROM stored_products WHERE computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' order by file_name" | less
|
|
|
psql $flo_user -c "DELETE FROM stored_products WHERE computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0'''"
|
|
|
```
|
|
|
## Other Database Querys
|
|
|
|
|
|
```sql
|
|
|
psql $flo_user -c "SELECT pydt(context->'granule') as d,count(*) FROM stored_products WHERE computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' group by d order by d order by file_name;" | less
|
|
|
|
|
|
psql $flo_user -c "SELECT pydt(context->'granule') as d,count(*) FROM stored_products WHERE computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' and date_trunc('days',pydt(context->'granule'))='2014-01-01' group by d order by d;" | less
|
|
|
|
|
|
psql $flo_user -c "SELECT job,size,context,file_name from stored_products where computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' order by file_name;" | less
|
|
|
|
|
|
psql $flo_user -c "SELECT job,size,context,file_name from stored_products where computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' and date_trunc('days',pydt(context->'granule'))='2014-01-01' order by file_name;" | less
|
|
|
|
|
|
flo3=> select x FROM generate_series('2015-04-01'::timestamp, '2015-04-30 23:59', '6 minutes') as x where not exists (select null from stored_products where computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and output='fused_l1b' and x=pydt(context->'granule'));
|
|
|
|
|
|
# List files keys
|
|
|
psql $flo_user -tA -c "SELECT format ('flo3/%s/%s',job,file_name) FROM stored_products WHERE computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' order by file_name limit 5;"
|
|
|
|
|
|
# List file keys and status
|
|
|
psql $flo_user -tA -c "SELECT format ('flo3/%s/%s',job,file_name) FROM stored_products WHERE computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' order by file_name limit 5;" | xargs -n1 -IXX rados -p dev --id flo stat XX
|
|
|
|
|
|
# List file key basenames
|
|
|
psql $flo_user -tA -c "SELECT format ('flo3/%s/%s',job,file_name) FROM stored_products WHERE computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' order by file_name limit 5;" | xargs -n1 -IXX basename XX
|
|
|
|
|
|
# List the rados commands to download files using the database file keys.
|
|
|
psql $flo_user -tA -c "SELECT format ('flo3/%s/%s',job,file_name) FROM stored_products WHERE computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'satellite'='''snpp''' and context->'version'='''1.0dev0''' order by file_name limit 5;" | xargs -n1 -IXX echo rados -p dev --id flo get XX "~/fusion_matlab/work/links/"$(basename XX)
|
|
|
|
|
|
# rados commands
|
|
|
rados -p dev --id flo get flo3/91069111/VNP02FSN.A2015091.0000.001.2018025170339.nc VNP02FSN.A2015091.0000.001.2018025170339.nc
|
|
|
```
|
|
|
|
|
|
## Running in Forward Stream
|
|
|
|
|
|
Job parameters for `FUSION_MATLAB` can be found from
|
|
|
```
|
|
|
flo3=> select * from forward_streams where name = 'FusionMatlab';
|
|
|
id | name | offset_start | offset_end | find_contexts_arguments | workflow_head | workflow_targets | workflow_download_onlies | job_mods | output_volume | num_retries | expiration
|
|
|
----+--------------+--------------+------------+-----------------------------------------------+------------------------------------+------------------------------------------------+--------------------------+-------------------------------+---------------+-------------+------------
|
|
|
43 | FusionMatlab | -4 days | 00:00:00 | "version"=>"'1.0dev3'", "satellite"=>"'snpp'" | flo.sw.fusion_matlab:FUSION_MATLAB | {flo.sw.fusion_matlab:FUSION_MATLAB;fused_l1b} | {} | "requests"=>"['Memory=8000']" | | |
|
|
|
(1 row)
|
|
|
```
|
|
|
and for `FUSION_MATLAB_QL`:
|
|
|
```
|
|
|
flo3=> select * from forward_streams where name = 'FusionMatlabDailyQL';
|
|
|
id | name | offset_start | offset_end | find_contexts_arguments | workflow_head | workflow_targets | workflow_download_onlies | job_mods | output_volume | num_retries | expiration
|
|
|
----+---------------------+--------------+------------+-----------------------------------------------+---------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+------------------------------------------------------------------------------------------------------------------------------------------+---------------+-------------+------------
|
|
|
53 | FusionMatlabDailyQL | -6 days | -2 days | "version"=>"'1.0dev2'", "satellite"=>"'snpp'" | flo.sw.fusion_matlab:FUSION_MATLAB_QL | {flo.sw.fusion_matlab:FUSION_MATLAB_QL;fused_l1b_ql_band27_asc,flo.sw.fusion_matlab:FUSION_MATLAB_QL;fused_l1b_ql_band27_desc,flo.sw.fusion_matlab:FUSION_MATLAB_QL;fused_l1b_ql_band33_asc,flo.sw.fusion_matlab:FUSION_MATLAB_QL;fused_l1b_ql_band33_desc} | {} | "classads"=>"['HookKeyword=SCRATCH']", "requests"=>"['Scratch=3','Memory=8000']", "requirements"=>"['TARGET.Scratch >= RequestScratch']" | | |
|
|
|
(1 row)
|
|
|
```
|
|
|
To preview the submission of the Fusion Matlab level-1b files to Forward Stream, we enter the following in the `psql` shell:
|
|
|
```
|
|
|
explain INSERT INTO forward_streams (
|
|
|
name, offset_start, offset_end, find_contexts_arguments,
|
|
|
workflow_head, workflow_targets, job_mods
|
|
|
)
|
|
|
VALUES (
|
|
|
'FusionMatlab',
|
|
|
'-4 Days'::interval,
|
|
|
'00:00:00'::interval,
|
|
|
'version=>"''1.0dev4''", satellite=>"''snpp''"'::hstore,
|
|
|
'flo.sw.fusion_matlab:FUSION_MATLAB',
|
|
|
'{flo.sw.fusion_matlab:FUSION_MATLAB;fused_l1b}'::text[],
|
|
|
'requests=>"[''Memory=8000'']"'::hstore
|
|
|
)
|
|
|
;
|
|
|
```
|
|
|
giving
|
|
|
```
|
|
|
QUERY PLAN
|
|
|
---------------------------------------------------------------
|
|
|
Insert on forward_streams (cost=0.00..0.01 rows=1 width=288)
|
|
|
-> Result (cost=0.00..0.01 rows=1 width=288)
|
|
|
(2 rows)
|
|
|
```
|
|
|
To preview the submission of the Fusion Matlab Quicklooks to Forward Stream, we enter the following in the `psql` shell:
|
|
|
```
|
|
|
explain INSERT INTO forward_streams (
|
|
|
name, offset_start, offset_end, find_contexts_arguments,
|
|
|
workflow_head, workflow_targets, job_mods
|
|
|
)
|
|
|
VALUES (
|
|
|
'FusionMatlabDailyQL',
|
|
|
'-6 Days'::interval,
|
|
|
'-2 Days'::interval,
|
|
|
'version=>"''1.0dev3''", satellite=>"''snpp''"'::hstore,
|
|
|
'flo.sw.fusion_matlab:FUSION_MATLAB_QL',
|
|
|
'{flo.sw.fusion_matlab:FUSION_MATLAB_QL;fused_l1b_ql_band27_asc,flo.sw.fusion_matlab:FUSION_MATLAB_QL;fused_l1b_ql_band27_desc,flo.sw.fusion_matlab:FUSION_MATLAB_QL;fused_l1b_ql_band33_asc,flo.sw.fusion_matlab:FUSION_MATLAB_QL;fused_l1b_ql_band33_desc}'::text[],
|
|
|
'requirements=>"[''TARGET.Scratch >= RequestScratch'']", requests=>"[''Scratch=3'',''Memory=8000'']", classads=>"[''HookKeyword=SCRATCH'']"'::hstore
|
|
|
)
|
|
|
;
|
|
|
```
|
|
|
which outputs
|
|
|
```
|
|
|
QUERY PLAN
|
|
|
---------------------------------------------------------------
|
|
|
Insert on forward_streams (cost=0.00..0.01 rows=1 width=288)
|
|
|
-> Result (cost=0.00..0.01 rows=1 width=288)
|
|
|
(2 rows)
|
|
|
```
|
|
|
To actually submit the task, remove the `explain` keyword from the above invocation.
|
|
|
|
|
|
## Examining log files of failed jobs
|
|
|
The details o failed jobs can be found from
|
|
|
```
|
|
|
psql $flo_user -c "select * from failed_jobs where head_computation = 'flo.sw.fusion_matlab:FUSION_MATLAB' and context->'version'='''1.0dev1''' and timestamp > '2018-01-30';"
|
|
|
```
|
|
|
Generate a list of jobnumbers for failed jobs:
|
|
|
```
|
|
|
psql $flo_user -c "SELECT job, context FROM failed_jobs WHERE head_computation='flo.sw.fusion_matlab:FUSION_MATLAB' and context->'version'='''1.0dev1''' and timestamp > '2018-01-30' order by context;" | grep granule | gawk '{print $1}' > fusion_matlab_v1.0dev1_failed_granules.txt
|
|
|
```
|
|
|
Read a file containing the job numbers of failed jobs, and do something with them...
|
|
|
```python
|
|
|
file_obj = open('fusion_matlab_v1.0dev1_failed_granules.txt','r')
|
|
|
jobnums = file_obj.readlines()
|
|
|
file_obj.close()
|
|
|
jobnums = [int(x) for x in jobnums]
|
|
|
|
|
|
run -e /mnt/sdata/geoffc/git/sips_utils/snippets.py
|
|
|
|
|
|
job_file_branches = [job_number_to_dir('/scratch/flo/jobs',job) for job in jobnums]
|
|
|
job_stdout_files = list(np.squeeze([glob(dir+'-stdout') for dir in job_file_branches]))
|
|
|
job_stderr_files = list(np.squeeze([glob(dir+'-stderr') for dir in job_file_branches]))
|
|
|
|
|
|
for files in job_stdout_files:
|
|
|
result = search_logfile_for_string(files, 'input sounder_0')
|
|
|
if result != []:
|
|
|
result = search_logfile_for_string(files, 'Dateline granule')
|
|
|
if result != []:
|
|
|
print(result[0].replace('\n',''))
|
|
|
else:
|
|
|
print(files)
|
|
|
else:
|
|
|
pass
|
|
|
```
|
|
|
|
|
|
```python
|
|
|
for stdout_file, stderr_file in zip(job_stdout_files,job_stderr_files):
|
|
|
try:
|
|
|
if os.path.isfile(stderr_file) and (os.stat(stderr_file).st_size > 0):
|
|
|
print('\n>>> stderr_file = {}'.format(stderr_file))
|
|
|
file_obj = open(stdout_file,'r')
|
|
|
for line in file_obj.readlines():
|
|
|
searchObj = re.search( r'Dateline granule', line, re.M)
|
|
|
if searchObj:
|
|
|
line = line.replace('\n','')
|
|
|
print('Checking {}: {}'.format(stdout_file, line))
|
|
|
else:
|
|
|
print('Checking {}:'.format(stdout_file))
|
|
|
|
|
|
file_obj.seek(3)
|
|
|
line = file_obj.readline()
|
|
|
line = os.pathbasename(line.replace('\n','').split(' ')[-1])
|
|
|
print(line)
|
|
|
|
|
|
file_obj.close()
|
|
|
else:
|
|
|
pass
|
|
|
#print('stderr_file {} does not exist or has zero size.'.format(stderr_file))
|
|
|
except Exception:
|
|
|
file_obj.close()
|
|
|
print('There was a problem with stderr_file {}'.format(stderr_file))
|
|
|
print(traceback.format_exc())
|
|
|
|
|
|
print('stdout_file = {}'.format(stdout_file))
|
|
|
```
|
|
|
```python
|
|
|
for stderr_file in [glob(dir+'-stdout') for dir in [job_number_to_dir('/scratch/flo/jobs',job) for job in range(77666696, 77667532)]]: check_call('tail -n 1 {}'.format(stderr_file[0]).split(' '))
|
|
|
|
|
|
logfile_obj = open(logpath,'w')
|
|
|
|
|
|
# Write the geocat output to a log file, and parse it to determine the output
|
|
|
# HDF4 files.
|
|
|
hdf_files = []
|
|
|
for line in exe_out.splitlines():
|
|
|
logfile_obj.write(line+"\n")
|
|
|
searchObj = re.search( r'geocat[LR].*\.hdf', line, re.M)
|
|
|
if searchObj:
|
|
|
hdf_files.append(string.split(line," ")[-1])
|
|
|
else:
|
|
|
pass
|
|
|
|
|
|
logfile_obj.close()
|
|
|
``` |