Skip to content
Snippets Groups Projects
Commit 6b3edcd3 authored by tomrink's avatar tomrink
Browse files

snapshot...

parent f67395cc
No related branches found
No related tags found
No related merge requests found
...@@ -6,9 +6,12 @@ import os, datetime ...@@ -6,9 +6,12 @@ import os, datetime
import numpy as np import numpy as np
import xarray as xr import xarray as xr
import pickle import pickle
import h5py
from deeplearning.amv_raob import get_bounding_gfs_files, convert_file, get_images, get_interpolated_profile, \ from deeplearning.amv_raob import get_bounding_gfs_files, convert_file, get_images, get_interpolated_profile, get_time_tuple_utc, get_profile
split_matchup, shuffle_dict, get_interpolated_scalar, get_num_samples, get_time_tuple_utc, get_profile
from icing.pirep_goes import split_data
from icing.pirep_goes import train_params_day
LOG_DEVICE_PLACEMENT = False LOG_DEVICE_PLACEMENT = False
...@@ -49,70 +52,6 @@ img_width = 24 ...@@ -49,70 +52,6 @@ img_width = 24
NUM_VERT_LEVELS = 26 NUM_VERT_LEVELS = 26
NUM_VERT_PARAMS = 2 NUM_VERT_PARAMS = 2
gfs_mean_temp = [225.481110,
218.950729,
215.830338,
212.063187,
209.348038,
208.787033,
213.728928,
218.298264,
223.061020,
229.190445,
236.095215,
242.589493,
248.333237,
253.357071,
257.768646,
261.599396,
264.793671,
267.667603,
270.408478,
272.841919,
274.929138,
276.826294,
277.786865,
278.834198,
279.980408,
281.308380]
gfs_mean_temp = np.array(gfs_mean_temp)
gfs_mean_temp = np.reshape(gfs_mean_temp, (1, gfs_mean_temp.shape[0]))
gfs_std_temp = [13.037852,
11.669035,
10.775956,
10.428216,
11.705231,
12.352798,
8.892235,
7.101064,
8.505628,
10.815929,
12.139559,
12.720000,
12.929382,
13.023590,
13.135534,
13.543551,
14.449997,
15.241049,
15.638563,
15.943666,
16.178715,
16.458992,
16.700863,
17.109579,
17.630177,
18.080544]
gfs_std_temp = np.array(gfs_std_temp)
gfs_std_temp = np.reshape(gfs_std_temp, (1, gfs_std_temp.shape[0]))
mean_std_dict = {'temperature': (gfs_mean_temp, gfs_std_temp), 'surface temperature': (279.35, 22.81),
'MSL pressure': (1010.64, 13.46), 'tropopause temperature': (208.17, 11.36), 'tropopause pressure': (219.62, 78.79)}
valid_range_dict = {'temperature': (150, 350), 'surface temperature': (150, 350), 'MSL pressure': (800, 1050),
'tropopause temperature': (150, 250), 'tropopause pressure': (100, 500)}
def build_residual_block(input, drop_rate, num_neurons, activation, block_name, doDropout=True, doBatchNorm=True): def build_residual_block(input, drop_rate, num_neurons, activation, block_name, doDropout=True, doBatchNorm=True):
with tf.name_scope(block_name): with tf.name_scope(block_name):
...@@ -175,7 +114,8 @@ class IcingIntensityNN: ...@@ -175,7 +114,8 @@ class IcingIntensityNN:
self.handle = None self.handle = None
self.inner_handle = None self.inner_handle = None
self.in_mem_batch = None self.in_mem_batch = None
self.matchup_dict = None self.filename = None
self.h5f = None
self.logits = None self.logits = None
...@@ -219,15 +159,16 @@ class IcingIntensityNN: ...@@ -219,15 +159,16 @@ class IcingIntensityNN:
self.initial_learning_rate = None self.initial_learning_rate = None
n_chans = len(abi_channels) n_chans = len(abi_channels)
NUM_PARAMS = 1
if TRIPLET: if TRIPLET:
n_chans *= 3 n_chans *= 3
self.X_img = tf.keras.Input(shape=(img_width, img_width, n_chans)) self.X_img = tf.keras.Input(shape=(img_width, img_width, n_chans))
#self.X_img = tf.keras.Input(shape=NUM_PARAMS)
self.X_prof = tf.keras.Input(shape=(NUM_VERT_LEVELS, NUM_VERT_PARAMS)) self.X_prof = tf.keras.Input(shape=(NUM_VERT_LEVELS, NUM_VERT_PARAMS))
self.X_sfc = tf.keras.Input(shape=2) self.X_sfc = tf.keras.Input(shape=2)
self.inputs.append(self.X_img) self.inputs.append(self.X_img)
self.inputs.append(self.X_prof) self.inputs.append(self.X_prof)
self.inputs.append(self.X_sfc)
self.DISK_CACHE = True self.DISK_CACHE = True
...@@ -251,207 +192,77 @@ class IcingIntensityNN: ...@@ -251,207 +192,77 @@ class IcingIntensityNN:
# Memory growth must be set before GPUs have been initialized # Memory growth must be set before GPUs have been initialized
print(e) print(e)
def get_in_mem_data_batch(self, time_keys): def get_in_mem_data_batch(self, keys):
images = []
vprof = [] # sort these to use as numpy indexing arrays
label = [] nd_keys = np.array(keys)
sfc = [] nd_keys = np.sort(nd_keys)
for key in time_keys: data = []
if CACHE_DATA_IN_MEM: for param in train_params_day:
tup = self.in_mem_data_cache.get(key) nda = self.h5f[param][nd_keys, ]
if tup is not None: # nda = do_normalize(nda)
images.append(tup[0]) data.append(nda)
vprof.append(tup[1]) data = np.stack(data)
label.append(tup[2]) data = np.transpose(data, axes=(1,0))
sfc.append(tup[3]) label = self.h5f['icing_intensity'][nd_keys]
continue label = np.where(label == -1, 0, label)
# binary
obs = self.matchup_dict.get(key) label = np.where(label != 0, 1, label)
if obs is None:
print('no entry for: ', key) # TODO: Implement in memory cache
timestamp = obs[0][0] # for key in keys:
print('not found in cache, processing key: ', key, get_time_tuple_utc(timestamp)[0]) # if CACHE_DATA_IN_MEM:
# tup = self.in_mem_data_cache.get(key)
gfs_0, time_0, gfs_1, time_1 = get_bounding_gfs_files(timestamp) # if tup is not None:
if (gfs_0 is None) and (gfs_1 is None): # images.append(tup[0])
print('no GFS for: ', get_time_tuple_utc(timestamp)[0]) # vprof.append(tup[1])
continue # label.append(tup[2])
try: # continue
gfs_0 = convert_file(gfs_0) #
if gfs_1 is not None: #
gfs_1 = convert_file(gfs_1) # if CACHE_DATA_IN_MEM:
except Exception as exc: # self.in_mem_data_cache[key] = (nda, ndb, ndc)
print(get_time_tuple_utc(timestamp)[0])
print(exc) return data, data, label
continue
ds_1 = None
try:
ds_0 = xr.open_dataset(gfs_0)
if gfs_1 is not None:
ds_1 = xr.open_dataset(gfs_1)
except Exception as exc:
print(exc)
continue
lons = obs[:, 2]
lats = obs[:, 1]
half_width = [abi_half_width.get(ch) for ch in abi_2km_channels]
strides = [abi_stride.get(ch) for ch in abi_2km_channels]
img_a_s, img_a_s_l, img_a_s_r, idxs_a = get_images(lons, lats, timestamp, abi_2km_channels, half_width, strides, do_norm=True, daynight=DAY_NIGHT)
if idxs_a.size == 0:
print('no images for: ', timestamp)
continue
idxs_b = None
if len(abi_hkm_channels) > 0:
half_width = [abi_half_width.get(ch) for ch in abi_hkm_channels]
strides = [abi_stride.get(ch) for ch in abi_hkm_channels]
img_b_s, img_b_s_l, img_b_s_r, idxs_b = get_images(lons, lats, timestamp, abi_hkm_channels, half_width, strides, do_norm=True, daynight=DAY_NIGHT)
if idxs_b.size == 0:
print('no hkm images for: ', timestamp)
continue
if idxs_b is None:
common_idxs = idxs_a
img_a_s = img_a_s[:, common_idxs, :, :]
img_s = img_a_s
if TRIPLET:
img_a_s_l = img_a_s_l[:, common_idxs, :, :]
img_a_s_r = img_a_s_r[:, common_idxs, :, :]
img_s_l = img_a_s_l
img_s_r = img_a_s_r
else:
common_idxs = np.intersect1d(idxs_a, idxs_b)
img_a_s = img_a_s[:, common_idxs, :, :]
img_b_s = img_b_s[:, common_idxs, :, :]
img_s = np.vstack([img_a_s, img_b_s])
# TODO: Triplet support
lons = lons[common_idxs]
lats = lats[common_idxs]
if ds_1 is not None:
ndb = get_interpolated_profile(ds_0, ds_1, time_0, time_1, 'temperature', timestamp, lons, lats, do_norm=True)
else:
ndb = get_profile(ds_0, 'temperature', lons, lats, do_norm=True)
if ndb is None:
continue
if ds_1 is not None:
ndf = get_interpolated_profile(ds_0, ds_1, time_0, time_1, 'rh', timestamp, lons, lats, do_norm=False)
else:
ndf = get_profile(ds_0, 'rh', lons, lats, do_norm=False)
if ndf is None:
continue
ndf /= 100.0
ndb = np.stack((ndb, ndf), axis=2)
#ndd = get_interpolated_scalar(ds_0, ds_1, time_0, time_1, 'MSL pressure', timestamp, lons, lats, do_norm=False)
#ndd /= 1000.0
#nde = get_interpolated_scalar(ds_0, ds_1, time_0, time_1, 'surface temperature', timestamp, lons, lats, do_norm=True)
# label/truth
# Level of best fit (LBF)
ndc = obs[common_idxs, 3]
# AMV Predicted
# ndc = obs[common_idxs, 4]
ndc /= 1000.0
nda = np.transpose(img_s, axes=[1, 2, 3, 0])
if TRIPLET or CONV3D:
nda_l = np.transpose(img_s_l, axes=[1, 2, 3, 0])
nda_r = np.transpose(img_s_r, axes=[1, 2, 3, 0])
if CONV3D:
nda = np.stack((nda_l, nda, nda_r), axis=4)
nda = np.transpose(nda, axes=[0, 1, 2, 4, 3])
else:
nda = np.concatenate([nda, nda_l, nda_r], axis=3)
images.append(nda)
vprof.append(ndb)
label.append(ndc)
# nds = np.stack([ndd, nde], axis=1)
nds = np.zeros((len(lons), 2))
sfc.append(nds)
if not CACHE_GFS:
subprocess.call(['rm', gfs_0, gfs_1])
if CACHE_DATA_IN_MEM:
self.in_mem_data_cache[key] = (nda, ndb, ndc, nds)
ds_0.close()
if ds_1 is not None:
ds_1.close()
images = np.concatenate(images)
label = np.concatenate(label)
label = np.reshape(label, (label.shape[0], 1))
vprof = np.concatenate(vprof)
sfc = np.concatenate(sfc)
return images, vprof, label, sfc
@tf.function(input_signature=[tf.TensorSpec(None, tf.int32)]) @tf.function(input_signature=[tf.TensorSpec(None, tf.int32)])
def data_function(self, input): def data_function(self, indexes):
out = tf.numpy_function(self.get_in_mem_data_batch, [input], [tf.float32, tf.float64, tf.float64, tf.float64]) out = tf.numpy_function(self.get_in_mem_data_batch, [indexes], [tf.float64, tf.float64, tf.int32])
return out return out
def get_train_dataset(self, time_keys): def get_train_dataset(self, indexes):
time_keys = list(time_keys) indexes = list(indexes)
dataset = tf.data.Dataset.from_tensor_slices(time_keys) dataset = tf.data.Dataset.from_tensor_slices(indexes)
dataset = dataset.batch(PROC_BATCH_SIZE) dataset = dataset.batch(PROC_BATCH_SIZE)
dataset = dataset.map(self.data_function, num_parallel_calls=8) dataset = dataset.map(self.data_function, num_parallel_calls=8)
dataset = dataset.shuffle(PROC_BATCH_BUFFER_SIZE) dataset = dataset.shuffle(PROC_BATCH_BUFFER_SIZE)
dataset = dataset.prefetch(buffer_size=1) dataset = dataset.prefetch(buffer_size=1)
self.train_dataset = dataset self.train_dataset = dataset
def get_test_dataset(self, time_keys): def get_test_dataset(self, indexes):
time_keys = list(time_keys) indexes = list(indexes)
dataset = tf.data.Dataset.from_tensor_slices(time_keys) dataset = tf.data.Dataset.from_tensor_slices(indexes)
dataset = dataset.batch(PROC_BATCH_SIZE) dataset = dataset.batch(PROC_BATCH_SIZE)
dataset = dataset.map(self.data_function, num_parallel_calls=8) dataset = dataset.map(self.data_function, num_parallel_calls=8)
self.test_dataset = dataset self.test_dataset = dataset
def setup_pipeline(self, matchup_dict, train_dict=None, valid_test_dict=None): def setup_pipeline(self, filename, train_idxs=None, test_idxs=None):
self.matchup_dict = matchup_dict self.filename = filename
self.h5f = h5py.File(filename, 'r')
if train_dict is None: time = self.h5f['time']
if valid_test_dict is not None: num_obs = time.shape[0]
self.matchup_dict = valid_test_dict trn_idxs, tst_idxs = split_data(num_obs, skip=8)
valid_keys = list(valid_test_dict.keys()) self.num_data_samples = trn_idxs.shape[0]
self.get_test_dataset(valid_keys)
self.num_data_samples = get_num_samples(valid_test_dict, valid_keys)
print('num test samples: ', self.num_data_samples)
print('setup_pipeline: Done')
return
train_dict, valid_test_dict = split_matchup(matchup_dict, perc=0.10) self.get_train_dataset(trn_idxs)
self.get_test_dataset(tst_idxs)
train_dict = shuffle_dict(train_dict) print('num train samples: ', self.num_data_samples)
train_keys = list(train_dict.keys())
self.get_train_dataset(train_keys)
self.num_data_samples = get_num_samples(train_dict, train_keys)
print('num data samples: ', self.num_data_samples)
print('BATCH SIZE: ', BATCH_SIZE) print('BATCH SIZE: ', BATCH_SIZE)
print('num test samples: ', tst_idxs.shape[0])
valid_keys = list(valid_test_dict.keys())
self.get_test_dataset(valid_keys)
print('num test samples: ', get_num_samples(valid_test_dict, valid_keys))
print('setup_pipeline: Done') print('setup_pipeline: Done')
def build_1d_cnn(self): def build_1d_cnn(self):
...@@ -615,7 +426,7 @@ class IcingIntensityNN: ...@@ -615,7 +426,7 @@ class IcingIntensityNN:
@tf.function @tf.function
def train_step(self, mini_batch): def train_step(self, mini_batch):
inputs = [mini_batch[0], mini_batch[1], mini_batch[3]] inputs = [mini_batch[0], mini_batch[1], mini_batch[2]]
labels = mini_batch[2] labels = mini_batch[2]
with tf.GradientTape() as tape: with tf.GradientTape() as tape:
pred = self.model(inputs, training=True) pred = self.model(inputs, training=True)
...@@ -634,7 +445,7 @@ class IcingIntensityNN: ...@@ -634,7 +445,7 @@ class IcingIntensityNN:
@tf.function @tf.function
def test_step(self, mini_batch): def test_step(self, mini_batch):
inputs = [mini_batch[0], mini_batch[1], mini_batch[3]] inputs = [mini_batch[0], mini_batch[1]]
labels = mini_batch[2] labels = mini_batch[2]
pred = self.model(inputs, training=False) pred = self.model(inputs, training=False)
t_loss = self.loss(labels, pred) t_loss = self.loss(labels, pred)
...@@ -643,7 +454,7 @@ class IcingIntensityNN: ...@@ -643,7 +454,7 @@ class IcingIntensityNN:
self.test_accuracy(labels, pred) self.test_accuracy(labels, pred)
def predict(self, mini_batch): def predict(self, mini_batch):
inputs = [mini_batch[0], mini_batch[1], mini_batch[3]] inputs = [mini_batch[0], mini_batch[1]]
labels = mini_batch[2] labels = mini_batch[2]
pred = self.model(inputs, training=False) pred = self.model(inputs, training=False)
t_loss = self.loss(labels, pred) t_loss = self.loss(labels, pred)
...@@ -674,8 +485,8 @@ class IcingIntensityNN: ...@@ -674,8 +485,8 @@ class IcingIntensityNN:
proc_batch_cnt = 0 proc_batch_cnt = 0
n_samples = 0 n_samples = 0
for abi, temp, lbfp, sfc in self.train_dataset: for abi, temp, lbfp in self.train_dataset:
trn_ds = tf.data.Dataset.from_tensor_slices((abi, temp, lbfp, sfc)) trn_ds = tf.data.Dataset.from_tensor_slices((abi, temp, lbfp))
trn_ds = trn_ds.batch(BATCH_SIZE) trn_ds = trn_ds.batch(BATCH_SIZE)
for mini_batch in trn_ds: for mini_batch in trn_ds:
if self.learningRateSchedule is not None: if self.learningRateSchedule is not None:
...@@ -691,8 +502,8 @@ class IcingIntensityNN: ...@@ -691,8 +502,8 @@ class IcingIntensityNN:
self.test_loss.reset_states() self.test_loss.reset_states()
self.test_accuracy.reset_states() self.test_accuracy.reset_states()
for abi_tst, temp_tst, lbfp_tst, sfc_tst in self.test_dataset: for abi_tst, temp_tst, lbfp_tst in self.test_dataset:
tst_ds = tf.data.Dataset.from_tensor_slices((abi_tst, temp_tst, lbfp_tst, sfc_tst)) tst_ds = tf.data.Dataset.from_tensor_slices((abi_tst, temp_tst, lbfp_tst))
tst_ds = tst_ds.batch(BATCH_SIZE) tst_ds = tst_ds.batch(BATCH_SIZE)
for mini_batch_test in tst_ds: for mini_batch_test in tst_ds:
self.test_step(mini_batch_test) self.test_step(mini_batch_test)
...@@ -718,8 +529,8 @@ class IcingIntensityNN: ...@@ -718,8 +529,8 @@ class IcingIntensityNN:
self.test_loss.reset_states() self.test_loss.reset_states()
self.test_accuracy.reset_states() self.test_accuracy.reset_states()
for abi, temp, lbfp, sfc in self.test_dataset: for abi, temp, lbfp in self.test_dataset:
ds = tf.data.Dataset.from_tensor_slices((abi, temp, lbfp, sfc)) ds = tf.data.Dataset.from_tensor_slices((abi, temp, lbfp))
ds = ds.batch(BATCH_SIZE) ds = ds.batch(BATCH_SIZE)
for mini_batch in ds: for mini_batch in ds:
self.test_step(mini_batch) self.test_step(mini_batch)
...@@ -754,16 +565,16 @@ class IcingIntensityNN: ...@@ -754,16 +565,16 @@ class IcingIntensityNN:
self.test_loss.reset_states() self.test_loss.reset_states()
self.test_accuracy.reset_states() self.test_accuracy.reset_states()
for abi_tst, temp_tst, lbfp_tst, sfc_tst in self.test_dataset: for abi_tst, temp_tst, lbfp_tst in self.test_dataset:
ds = tf.data.Dataset.from_tensor_slices((abi_tst, temp_tst, lbfp_tst, sfc_tst)) ds = tf.data.Dataset.from_tensor_slices((abi_tst, temp_tst, lbfp_tst))
ds = ds.batch(BATCH_SIZE) ds = ds.batch(BATCH_SIZE)
for mini_batch_test in ds: for mini_batch_test in ds:
self.predict(mini_batch_test) self.predict(mini_batch_test)
print('loss, acc: ', self.test_loss.result(), self.test_accuracy.result()) print('loss, acc: ', self.test_loss.result(), self.test_accuracy.result())
def run(self, matchup_dict, train_dict=None, valid_dict=None): def run(self, filename, train_dict=None, valid_dict=None):
with tf.device('/device:GPU:'+str(self.gpu_device)): with tf.device('/device:GPU:'+str(self.gpu_device)):
self.setup_pipeline(matchup_dict, train_dict=train_dict, valid_test_dict=valid_dict) self.setup_pipeline(filename, train_idxs=train_dict, test_idxs=valid_dict)
self.build_model() self.build_model()
self.build_training() self.build_training()
self.build_evaluation() self.build_evaluation()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment