Skip to content
Snippets Groups Projects
Commit 932df900 authored by David Hoese's avatar David Hoese
Browse files

Add initial notebook examples

parent 381a0bb2
Branches
No related tags found
No related merge requests found
%% Cell type:code id: tags:
``` python
import os
os.environ['HDF5_USE_FILE_LOCKING'] = "FALSE"
os.environ['PYTROLL_CHUNK_SIZE'] = "2048"
from glob import glob
from satpy import MultiScene
import dask
from multiprocessing.pool import ThreadPool
from dask.diagnostics import ProgressBar
```
%% Cell type:code id: tags:
``` python
input_files = glob('/arcdata/goes/grb/goes16/2019/2019_01_17_017/abi/L1b/RadC/*C01*s201901712*.nc')
# input_files = glob('/arcdata/goes/grb/goes16/2019/2019_01_17_017/abi/L1b/RadC/*s2019017[12]*.nc')
#input_files = glob('/arcdata/goes/grb/goes16/2019/2019_01_18_018/abi/L1b/RadC/*s2019018[12]*.nc')
#input_files += glob('/arcdata/goes/grb/goes16/2019/2019_01_19_019/abi/L1b/RadC/*s20190190*.nc')
print(len(input_files))
```
%% Output
12
%% Cell type:code id: tags:
``` python
mscn = MultiScene.from_files(input_files, reader='abi_l1b')
mscn.load(['C01'])
# mscn.load(['true_color_night'])
# res_mscn = mscn.resample(resampler='native')
```
%% Cell type:code id: tags:
``` python
import sys
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(partition='all',
walltime='02:00:00',
name='davidh_dask',
cores=4,
processes=2,
memory='20GB',
python=sys.executable, # '/home/davidh/miniconda3/envs/pangeo/bin/python',
local_directory='/scratch',
diagnostics_port=int(os.getenv('DASK_PORT', 8787)),
)
cluster.scale(5)
cluster
```
%% Output
/home/davidh/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/bokeh/core.py:56: UserWarning:
Port 33121 is already in use.
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
warnings.warn('\n' + msg)
%% Cell type:code id: tags:
``` python
from dask.distributed import Client
client = Client(cluster)
client
```
%% Output
<Client: scheduler='tcp://10.23.255.247:33990' processes=0 cores=0>
%% Cell type:code id: tags:
``` python
import numpy as np
c01_avg = np.mean([scn['C01'] for scn in mscn], axis=0)
```
%% Cell type:code id: tags:
``` python
c01_avg
```
%% Cell type:code id: tags:
``` python
%%time
all_jobs = []
for idx, scn in enumerate(res_mscn):
print("Scene {:d}".format(idx))
res = scn.save_datasets(writer='scmi', base_dir='/odyssey/isis/tmp/davidh', sector_id="GOES_EAST", source_name="SSEC", compute=False)
all_jobs.append(res)
```
%% Output
Scene 0
Scene 1
Scene 2
Scene 3
Scene 4
Scene 5
Scene 6
Scene 7
Scene 8
Scene 9
Scene 10
CPU times: user 39.1 s, sys: 6.36 s, total: 45.4 s
Wall time: 1min 7s
%% Cell type:code id: tags:
``` python
all_jobs = [x for y in all_jobs for x in y]
print(len(all_jobs))
all_jobs
```
%% Cell type:code id: tags:
``` python
dask.compute(all_jobs)
```
%% Cell type:code id: tags:
``` python
%%time
mscn.save_animation('/odyssey/isis/tmp/davidh/g16_abi_{name}_{start_time:%Y%m%d_%H%M%S}/g16_abi_{name}_{start_time:%Y%m%d_%H%M%S}.mp4', fps=24, batch_size=16)
```
%% Cell type:code id: tags:
``` python
!png2mp4.sh /odyssey/isis/tmp/davidh/g16_abi_true_color_night.mp4 /odyssey/isis/tmp/davidh/g16_abi_true_color_night_2*/*.png
```
%% Cell type:code id: tags:
``` python
del client
del cluster
```
%% Cell type:code id: tags:
``` python
```
%% Cell type:markdown id: tags:
# Basic Dask Test on a Slurm Cluster
See additional options for Dask's SLURMCluster class [here](http://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html).
Note the "diagnostics_port" keyword argument is only available in dask-jobqueue >0.4.1 which as of this writing has not been released.
%% Cell type:code id: tags:
``` python
import os
import sys
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(partition='all',
walltime='02:00:00',
name='davidh_dask_pangeo',
cores=4,
processes=2,
memory='20GB',
python=sys.executable, # '/home/davidh/miniconda3/envs/pangeo/bin/python',
local_directory='/scratch',
diagnostics_port=int(os.getenv('DASK_PORT', 8787)),
)
cluster.scale(5)
```
%% Cell type:code id: tags:
``` python
cluster
```
%% Output
%% Cell type:code id: tags:
``` python
from dask.distributed import Client
client = Client(cluster)
```
%% Cell type:code id: tags:
``` python
client
```
%% Output
<Client: scheduler='tcp://10.23.255.247:47704' processes=6 cores=12>
%% Cell type:code id: tags:
``` python
import dask.array as da
def dask_test():
a = da.random.random(size=(50000, 1000), chunks=(1000, 1000))
q, r = da.linalg.qr(a)
a2 = q.dot(r)
out = a2.compute()
```
%% Cell type:code id: tags:
``` python
%%time
dask_test()
```
%% Output
CPU times: user 4.31 s, sys: 1.36 s, total: 5.67 s
Wall time: 15.6 s
%% Cell type:code id: tags:
``` python
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment