Skip to content
Snippets Groups Projects
Commit 85c889d9 authored by tomrink's avatar tomrink
Browse files

initial commit

parent 90bf7781
No related branches found
No related tags found
No related merge requests found
import tensorflow as tf
from util.setup import logdir, modeldir, cachepath
from util.util import homedir
import subprocess
import os, datetime
import numpy as np
import pickle
import h5py
from icing.pirep_goes import split_data, normalize
LOG_DEVICE_PLACEMENT = False
CACHE_DATA_IN_MEM = True
PROC_BATCH_SIZE = 2046
PROC_BATCH_BUFFER_SIZE = 50000
NumLabels = 1
BATCH_SIZE = 256
NUM_EPOCHS = 50
TRACK_MOVING_AVERAGE = False
TRIPLET = False
CONV3D = False
img_width = 16
mean_std_file = homedir+'data/icing/mean_std_l1b_no_ice.pkl'
f = open(mean_std_file, 'rb')
mean_std_dct = pickle.load(f)
f.close()
# train_params = ['cld_height_acha', 'cld_geo_thick', 'supercooled_cloud_fraction', 'cld_temp_acha', 'cld_press_acha',
# 'cld_reff_dcomp', 'cld_opd_dcomp', 'cld_cwp_dcomp', 'iwc_dcomp', 'lwc_dcomp']
# #'cloud_phase']
train_params = ['temp_10_4um_nom', 'temp_11_0um_nom', 'temp_12_0um_nom', 'temp_13_3um_nom', 'temp_3_75um_nom',
'temp_6_2um_nom', 'temp_6_7um_nom', 'temp_7_3um_nom', 'temp_8_5um_nom', 'temp_9_7um_nom',
'refl_0_47um_nom', 'refl_0_65um_nom', 'refl_0_86um_nom', 'refl_1_38um_nom', 'refl_1_60um_nom']
def build_residual_block(input, drop_rate, num_neurons, activation, block_name, doDropout=True, doBatchNorm=True):
with tf.name_scope(block_name):
if doDropout:
fc = tf.keras.layers.Dropout(drop_rate)(input)
fc = tf.keras.layers.Dense(num_neurons, activation=activation)(fc)
else:
fc = tf.keras.layers.Dense(num_neurons, activation=activation)(input)
if doBatchNorm:
fc = tf.keras.layers.BatchNormalization()(fc)
print(fc.shape)
fc_skip = fc
if doDropout:
fc = tf.keras.layers.Dropout(drop_rate)(fc)
fc = tf.keras.layers.Dense(num_neurons, activation=activation)(fc)
if doBatchNorm:
fc = tf.keras.layers.BatchNormalization()(fc)
print(fc.shape)
if doDropout:
fc = tf.keras.layers.Dropout(drop_rate)(fc)
fc = tf.keras.layers.Dense(num_neurons, activation=activation)(fc)
if doBatchNorm:
fc = tf.keras.layers.BatchNormalization()(fc)
print(fc.shape)
if doDropout:
fc = tf.keras.layers.Dropout(drop_rate)(fc)
fc = tf.keras.layers.Dense(num_neurons, activation=None)(fc)
if doBatchNorm:
fc = tf.keras.layers.BatchNormalization()(fc)
fc = fc + fc_skip
fc = tf.keras.layers.LeakyReLU()(fc)
print(fc.shape)
return fc
class IcingIntensityNN:
def __init__(self, gpu_device=0, datapath=None):
self.train_data = None
self.train_label = None
self.test_data = None
self.test_label = None
self.test_data_denorm = None
self.train_dataset = None
self.inner_train_dataset = None
self.test_dataset = None
self.X_img = None
self.X_prof = None
self.X_u = None
self.X_v = None
self.X_sfc = None
self.inputs = []
self.y = None
self.handle = None
self.inner_handle = None
self.in_mem_batch = None
self.filename = None
self.h5f = None
self.h5f_l1b = None
self.logits = None
self.predict_data = None
self.predict_dataset = None
self.mean_list = None
self.std_list = None
self.training_op = None
self.correct = None
self.accuracy = None
self.loss = None
self.pred_class = None
self.gpu_device = gpu_device
self.variable_averages = None
self.global_step = None
self.writer_train = None
self.writer_valid = None
self.OUT_OF_RANGE = False
self.abi = None
self.temp = None
self.wv = None
self.lbfp = None
self.sfc = None
self.in_mem_data_cache = {}
self.model = None
self.optimizer = None
self.train_loss = None
self.train_accuracy = None
self.test_loss = None
self.test_accuracy = None
self.test_auc = None
self.test_recall = None
self.test_precision = None
self.learningRateSchedule = None
self.num_data_samples = None
self.initial_learning_rate = None
n_chans = len(train_params)
NUM_PARAMS = n_chans
if TRIPLET:
n_chans *= 3
self.X_img = tf.keras.Input(shape=(img_width, img_width, n_chans))
self.inputs.append(self.X_img)
self.DISK_CACHE = False
if datapath is not None:
self.DISK_CACHE = False
f = open(datapath, 'rb')
self.in_mem_data_cache = pickle.load(f)
f.close()
tf.debugging.set_log_device_placement(LOG_DEVICE_PLACEMENT)
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
# Currently, memory growth needs to be the same across GPUs
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
logical_gpus = tf.config.experimental.list_logical_devices('GPU')
print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
except RuntimeError as e:
# Memory growth must be set before GPUs have been initialized
print(e)
def get_in_mem_data_batch(self, idxs):
key = frozenset(idxs)
if CACHE_DATA_IN_MEM:
tup = self.in_mem_data_cache.get(key)
if tup is not None:
return tup[0], tup[1]
# sort these to use as numpy indexing arrays
nd_idxs = np.array(idxs)
nd_idxs = np.sort(nd_idxs)
data = []
for param in train_params:
nda = self.h5f[param][nd_idxs, ]
nda = normalize(nda, param, mean_std_dct)
data.append(nda)
data = np.stack(data)
data = data.astype(np.float32)
data = np.transpose(data, axes=(1, 0))
label = self.h5f['icing_intensity'][nd_idxs]
label = label.astype(np.int32)
label = np.where(label == -1, 0, label)
# binary, two class
label = np.where(label != 0, 1, label)
label = label.reshape((label.shape[0], 1))
# keep = (label == 0) | (label == 3) | (label == 4) | (label == 5) | (label == 6)
# data = data[keep,]
# label = label[keep]
# label = np.where(label != 0, 1, label)
# label = label.reshape((label.shape[0], 1))
if CACHE_DATA_IN_MEM:
self.in_mem_data_cache[key] = (data, label)
return data, label
@tf.function(input_signature=[tf.TensorSpec(None, tf.int32)])
def data_function(self, indexes):
out = tf.numpy_function(self.get_in_mem_data_batch, [indexes], [tf.float32, tf.int32])
return out
def get_train_dataset(self, indexes):
indexes = list(indexes)
dataset = tf.data.Dataset.from_tensor_slices(indexes)
dataset = dataset.batch(PROC_BATCH_SIZE)
dataset = dataset.map(self.data_function, num_parallel_calls=8)
dataset = dataset.shuffle(PROC_BATCH_BUFFER_SIZE)
dataset = dataset.prefetch(buffer_size=1)
self.train_dataset = dataset
def get_test_dataset(self, indexes):
indexes = list(indexes)
dataset = tf.data.Dataset.from_tensor_slices(indexes)
dataset = dataset.batch(PROC_BATCH_SIZE)
dataset = dataset.map(self.data_function, num_parallel_calls=8)
self.test_dataset = dataset
def setup_pipeline(self, filename, train_idxs=None, test_idxs=None):
self.filename = filename
self.h5f = h5py.File(filename, 'r')
time = self.h5f['time']
num_obs = time.shape[0]
trn_idxs, tst_idxs = split_data(num_obs, skip=4)
self.num_data_samples = trn_idxs.shape[0]
self.get_train_dataset(trn_idxs)
self.get_test_dataset(tst_idxs)
print('num train samples: ', self.num_data_samples)
print('BATCH SIZE: ', BATCH_SIZE)
print('num test samples: ', tst_idxs.shape[0])
print('setup_pipeline: Done')
def build_1d_cnn(self):
print('build_1d_cnn')
# padding = 'VALID'
padding = 'SAME'
# activation = tf.nn.relu
# activation = tf.nn.elu
activation = tf.nn.leaky_relu
num_filters = 6
conv = tf.keras.layers.Conv1D(num_filters, 5, strides=1, padding=padding)(self.inputs[1])
conv = tf.keras.layers.MaxPool1D(padding=padding)(conv)
print(conv)
num_filters *= 2
conv = tf.keras.layers.Conv1D(num_filters, 3, strides=1, padding=padding)(conv)
conv = tf.keras.layers.MaxPool1D(padding=padding)(conv)
print(conv)
num_filters *= 2
conv = tf.keras.layers.Conv1D(num_filters, 3, strides=1, padding=padding)(conv)
conv = tf.keras.layers.MaxPool1D(padding=padding)(conv)
print(conv)
num_filters *= 2
conv = tf.keras.layers.Conv1D(num_filters, 3, strides=1, padding=padding)(conv)
conv = tf.keras.layers.MaxPool1D(padding=padding)(conv)
print(conv)
flat = tf.keras.layers.Flatten()(conv)
print(flat)
return flat
def build_cnn(self):
print('build_cnn')
# padding = "VALID"
padding = "SAME"
# activation = tf.nn.relu
# activation = tf.nn.elu
activation = tf.nn.leaky_relu
momentum = 0.99
num_filters = 8
conv = tf.keras.layers.Conv2D(num_filters, 5, strides=[1, 1], padding=padding, activation=activation)(self.inputs[0])
conv = tf.keras.layers.MaxPool2D(padding=padding)(conv)
conv = tf.keras.layers.BatchNormalization()(conv)
print(conv.shape)
num_filters *= 2
conv = tf.keras.layers.Conv2D(num_filters, 3, strides=[1, 1], padding=padding, activation=activation)(conv)
conv = tf.keras.layers.MaxPool2D(padding=padding)(conv)
conv = tf.keras.layers.BatchNormalization()(conv)
print(conv.shape)
num_filters *= 2
conv = tf.keras.layers.Conv2D(num_filters, 3, strides=[1, 1], padding=padding, activation=activation)(conv)
conv = tf.keras.layers.MaxPool2D(padding=padding)(conv)
conv = tf.keras.layers.BatchNormalization()(conv)
print(conv.shape)
num_filters *= 2
conv = tf.keras.layers.Conv2D(num_filters, 3, strides=[1, 1], padding=padding, activation=activation)(conv)
conv = tf.keras.layers.MaxPool2D(padding=padding)(conv)
conv = tf.keras.layers.BatchNormalization()(conv)
print(conv.shape)
# num_filters *= 2
# conv = tf.keras.layers.Conv2D(num_filters, 3, strides=[1, 1], padding=padding, activation=activation)(conv)
# conv = tf.keras.layers.MaxPool2D(padding=padding)(conv)
# conv = tf.keras.layers.BatchNormalization()(conv)
# print(conv.shape)
flat = tf.keras.layers.Flatten()(conv)
return flat
def build_dnn(self, input_layer=None):
print('build fully connected layer')
drop_rate = 0.5
# activation = tf.nn.relu
# activation = tf.nn.elu
activation = tf.nn.leaky_relu
momentum = 0.99
if input_layer is not None:
flat = input_layer
n_hidden = input_layer.shape[1]
else:
flat = self.X_img
n_hidden = self.X_img.shape[1]
fac = 2
fc = build_residual_block(flat, drop_rate, fac*n_hidden, activation, 'Residual_Block_1', doBatchNorm=True)
fc = build_residual_block(fc, drop_rate, fac*n_hidden, activation, 'Residual_Block_2', doBatchNorm=True)
fc = build_residual_block(fc, drop_rate, fac*n_hidden, activation, 'Residual_Block_3', doBatchNorm=True)
fc = build_residual_block(fc, drop_rate, fac*n_hidden, activation, 'Residual_Block_4', doBatchNorm=True)
# fc = build_residual_block(fc, drop_rate, fac*n_hidden, activation, 'Residual_Block_5', doBatchNorm=True)
#
# fc = build_residual_block(fc, drop_rate, fac*n_hidden, activation, 'Residual_Block_6', doBatchNorm=True)
#
# fc = build_residual_block(fc, drop_rate, fac*n_hidden, activation, 'Residual_Block_7', doBatchNorm=True)
#
# fc = build_residual_block(fc, drop_rate, fac*n_hidden, activation, 'Residual_Block_8', doBatchNorm=True)
fc = tf.keras.layers.Dense(n_hidden, activation=activation)(fc)
fc = tf.keras.layers.BatchNormalization()(fc)
print(fc.shape)
# activation = tf.nn.softmax
activation = tf.nn.sigmoid # For binary
logits = tf.keras.layers.Dense(NumLabels, activation=activation)(fc)
print(logits.shape)
self.logits = logits
def build_training(self):
self.loss = tf.keras.losses.BinaryCrossentropy(from_logits=False) # for two-class only
#self.loss = tf.keras.losses.SparseCategoricalCrossentropy() # For multi-class
# decayed_learning_rate = learning_rate * decay_rate ^ (global_step / decay_steps)
initial_learning_rate = 0.002
decay_rate = 0.95
steps_per_epoch = int(self.num_data_samples/BATCH_SIZE) # one epoch
# decay_steps = int(steps_per_epoch / 2)
decay_steps = 4 * steps_per_epoch
print('initial rate, decay rate, steps/epoch, decay steps: ', initial_learning_rate, decay_rate, steps_per_epoch, decay_steps)
self.learningRateSchedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate, decay_steps, decay_rate)
optimizer = tf.keras.optimizers.Adam(learning_rate=self.learningRateSchedule)
if TRACK_MOVING_AVERAGE:
ema = tf.train.ExponentialMovingAverage(decay=0.999)
with tf.control_dependencies([optimizer]):
optimizer = ema.apply(self.model.trainable_variables)
self.optimizer = optimizer
self.initial_learning_rate = initial_learning_rate
def build_evaluation(self):
self.train_accuracy = tf.keras.metrics.BinaryAccuracy(name='train_accuracy')
self.test_accuracy = tf.keras.metrics.BinaryAccuracy(name='test_accuracy')
self.test_auc = tf.keras.metrics.AUC(name='test_auc')
self.test_recall = tf.keras.metrics.Recall(name='test_recall')
self.test_precision = tf.keras.metrics.Precision(name='test_precision')
self.train_loss = tf.keras.metrics.Mean(name='train_loss')
self.test_loss = tf.keras.metrics.Mean(name='test_loss')
def build_predict(self):
_, pred = tf.nn.top_k(self.logits)
self.pred_class = pred
if TRACK_MOVING_AVERAGE:
self.variable_averages = tf.train.ExponentialMovingAverage(0.999, self.global_step)
self.variable_averages.apply(self.model.trainable_variables)
@tf.function
def train_step(self, mini_batch):
inputs = [mini_batch[0]]
labels = mini_batch[1]
with tf.GradientTape() as tape:
pred = self.model(inputs, training=True)
loss = self.loss(labels, pred)
total_loss = loss
if len(self.model.losses) > 0:
reg_loss = tf.math.add_n(self.model.losses)
total_loss = loss + reg_loss
gradients = tape.gradient(total_loss, self.model.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
self.train_loss(loss)
self.train_accuracy(labels, pred)
return loss
@tf.function
def test_step(self, mini_batch):
inputs = [mini_batch[0]]
labels = mini_batch[1]
pred = self.model(inputs, training=False)
t_loss = self.loss(labels, pred)
self.test_loss(t_loss)
self.test_accuracy(labels, pred)
self.test_auc(labels, pred)
self.test_recall(labels, pred)
self.test_precision(labels, pred)
def predict(self, mini_batch):
inputs = [mini_batch[0]]
labels = mini_batch[1]
pred = self.model(inputs, training=False)
t_loss = self.loss(labels, pred)
def do_training(self, ckpt_dir=None):
if ckpt_dir is None:
if not os.path.exists(modeldir):
os.mkdir(modeldir)
ckpt = tf.train.Checkpoint(step=tf.Variable(1), model=self.model)
ckpt_manager = tf.train.CheckpointManager(ckpt, modeldir, max_to_keep=3)
else:
ckpt = tf.train.Checkpoint(step=tf.Variable(1), model=self.model)
ckpt_manager = tf.train.CheckpointManager(ckpt, ckpt_dir, max_to_keep=3)
self.writer_train = tf.summary.create_file_writer(os.path.join(logdir, 'plot_train'))
self.writer_valid = tf.summary.create_file_writer(os.path.join(logdir, 'plot_valid'))
step = 0
total_time = 0
for epoch in range(NUM_EPOCHS):
self.train_loss.reset_states()
self.train_accuracy.reset_states()
t0 = datetime.datetime.now().timestamp()
proc_batch_cnt = 0
n_samples = 0
for data0, label in self.train_dataset:
trn_ds = tf.data.Dataset.from_tensor_slices((data0, label))
trn_ds = trn_ds.batch(BATCH_SIZE)
for mini_batch in trn_ds:
if self.learningRateSchedule is not None:
loss = self.train_step(mini_batch)
if (step % 100) == 0:
with self.writer_train.as_default():
tf.summary.scalar('loss_trn', loss.numpy(), step=step)
tf.summary.scalar('num_train_steps', step, step=step)
tf.summary.scalar('num_epochs', epoch, step=step)
self.test_loss.reset_states()
self.test_accuracy.reset_states()
for data0_tst, label_tst in self.test_dataset:
tst_ds = tf.data.Dataset.from_tensor_slices((data0_tst, label_tst))
tst_ds = tst_ds.batch(BATCH_SIZE)
for mini_batch_test in tst_ds:
self.test_step(mini_batch_test)
with self.writer_valid.as_default():
tf.summary.scalar('loss_val', self.test_loss.result(), step=step)
tf.summary.scalar('acc_val', self.test_accuracy.result(), step=step)
tf.summary.scalar('num_train_steps', step, step=step)
tf.summary.scalar('num_epochs', epoch, step=step)
print('****** test loss, acc: ', self.test_loss.result().numpy(), self.test_accuracy.result().numpy())
step += 1
print('train loss: ', loss.numpy())
proc_batch_cnt += 1
n_samples += data0.shape[0]
print('proc_batch_cnt: ', proc_batch_cnt, n_samples)
t1 = datetime.datetime.now().timestamp()
print('End of Epoch: ', epoch+1, 'elapsed time: ', (t1-t0))
total_time += (t1-t0)
self.test_loss.reset_states()
self.test_accuracy.reset_states()
for data0, label in self.test_dataset:
ds = tf.data.Dataset.from_tensor_slices((data0, label))
ds = ds.batch(BATCH_SIZE)
for mini_batch in ds:
self.test_step(mini_batch)
print('loss, acc : ', self.test_loss.result().numpy(), self.test_accuracy.result().numpy())
print('---------------------------------------------------------')
ckpt_manager.save()
if self.DISK_CACHE and epoch == 0:
f = open(cachepath, 'wb')
pickle.dump(self.in_mem_data_cache, f)
f.close()
print('total time: ', total_time)
self.writer_train.close()
self.writer_valid.close()
def build_model(self):
flat = self.build_cnn()
# flat_1d = self.build_1d_cnn()
# flat = tf.keras.layers.concatenate([flat, flat_1d, flat_anc])
# flat = tf.keras.layers.concatenate([flat, flat_1d])
# self.build_dnn(flat)
self.build_dnn(flat)
self.model = tf.keras.Model(self.inputs, self.logits)
def restore(self, ckpt_dir):
ckpt = tf.train.Checkpoint(step=tf.Variable(1), model=self.model)
ckpt_manager = tf.train.CheckpointManager(ckpt, ckpt_dir, max_to_keep=3)
ckpt.restore(ckpt_manager.latest_checkpoint)
self.test_loss.reset_states()
self.test_accuracy.reset_states()
for abi_tst, temp_tst, lbfp_tst in self.test_dataset:
ds = tf.data.Dataset.from_tensor_slices((abi_tst, temp_tst, lbfp_tst))
ds = ds.batch(BATCH_SIZE)
for mini_batch_test in ds:
self.predict(mini_batch_test)
print('loss, acc: ', self.test_loss.result(), self.test_accuracy.result())
def run(self, filename, filename_l1b=None, train_dict=None, valid_dict=None):
with tf.device('/device:GPU:'+str(self.gpu_device)):
self.setup_pipeline(filename, train_idxs=train_dict, test_idxs=valid_dict)
self.build_model()
self.build_training()
self.build_evaluation()
self.do_training()
def run_restore(self, matchup_dict, ckpt_dir):
self.setup_pipeline(None, None, matchup_dict)
self.build_model()
self.build_training()
self.build_evaluation()
self.restore(ckpt_dir)
if __name__ == "__main__":
nn = IcingIntensityNN()
nn.run('matchup_filename')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment