classification.py 11.04 KiB
import pandas as pd
import pylab as pl
import numpy as np
import scipy.optimize as opt
from sklearn import preprocessing, svm
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, accuracy_score, jaccard_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
import itertools
import sklearn.tree as tree
from sklearn.tree import export_graphviz
# def analyze(dataFrame):
# no_icing_df = dataFrame[dataFrame['icing_intensity'] == -1]
# icing_df = dataFrame[dataFrame['icing_intensity'] >= 1]
# return no_icing_df, icing_df
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
"""
This function prints and plots the confusion matrix.
Normalization can be applied by setting `normalize=True`.
"""
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
else:
print('Confusion matrix, without normalization')
print(cm)
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
def get_csv_as_dataframe(csv_file, reduce_frac=None, random_state=42):
icing_df = pd.read_csv(csv_file)
# Random selection of reduce_frac of the rows
if reduce_frac is not None:
icing_df = icing_df.sample(axis=0, frac=reduce_frac, random_state=random_state)
# # remove approximately half of rows where column_name equals to column_value
# column_name = 'icing_intensity'
# column_value = -1
# if column_name in icing_df.columns:
# df_to_reduce = icing_df[icing_df[column_name] == column_value]
# icing_df = icing_df[icing_df[column_name] != column_value]
#
# if reduce_frac is not None:
# df_to_reduce = df_to_reduce.sample(axis=0, frac=0.5, random_state=random_state)
#
# icing_df = pd.concat([icing_df, df_to_reduce])
return icing_df
def get_feature_target_data(data_frame, standardize=True):
icing_df = data_frame
# The independent variables (features) we want to use:
params = ['cld_geo_thick', 'cld_temp_acha', 'conv_cloud_fraction', 'supercooled_cloud_fraction', 'cld_reff_dcomp',
'cld_opd_dcomp', 'iwc_dcomp']
# Remove this column
icing_df = icing_df.drop('lwc_dcomp', axis=1)
# Remove this column
icing_df = icing_df.drop('cld_geo_thick', axis=1)
# Remove rows with NaN values
# icing_df = icing_df.dropna()
x = np.asarray(icing_df[params])
print('num obs, features: ', x.shape)
if standardize:
x = preprocessing.StandardScaler().fit(x).transform(x)
# The dependent variable (target) ------------------------------
y = np.asarray(icing_df['icing_intensity'])
y = np.where(y == -1, 0, y)
y = np.where(y >= 1, 1, y)
print('num no icing: ', np.sum(y == 0))
print('num icing: ', np.sum(y == 1))
return x, y
def logistic_regression(x, y, x_test=None, y_test=None):
if x_test is None:
x_train, x_test, y_train, y_test = train_test_split( x, y, test_size=0.2, random_state=4)
else:
x_train = x
y_train = y
print('Train set:', x_train.shape, y_train.shape)
print('Test set:', x_test.shape, y_test.shape)
x_train = np.where(np.isnan(x_train), 0, x_train)
x_test = np.where(np.isnan(x_test), 0, x_test)
print('num no icing test: ', np.sum(y_test == 0))
print('num icing test: ', np.sum(y_test == 1))
LR = LogisticRegression(C=0.01, solver='liblinear').fit(x_train, y_train)
yhat = LR.predict(x_test)
yhat_prob = LR.predict_proba(x_test)
print(confusion_matrix(y_test, yhat, labels=[1,0]))
print('Accuracy: ', "{:.4f}".format(accuracy_score(y_test, yhat)))
print('Jaccard Idx: ', "{:.4f}".format(jaccard_score(y_test, yhat)))
print('Precision: ', "{:.4f}".format(precision_score(y_test, yhat)))
print('Recall: ', "{:.4f}".format(recall_score(y_test, yhat)))
print('F1: ', "{:.4f}".format(f1_score(y_test, yhat)))
print('AUC: ', "{:.4f}".format(roc_auc_score(y_test, yhat_prob[:, 1])))
def k_nearest_neighbors(x, y, k=4):
x_train, x_test, y_train, y_test = train_test_split( x, y, test_size=0.2, random_state=4)
print('Train set:', x_train.shape, y_train.shape)
print('Test set:', x_test.shape, y_test.shape)
x_train = np.where(np.isnan(x_train), 0, x_train)
x_test = np.where(np.isnan(x_test), 0, x_test)
print('num no icing test: ', np.sum(y_test == 0))
print('num icing test: ', np.sum(y_test == 1))
KN_C = KNeighborsClassifier(n_neighbors=k).fit(x_train, y_train)
yhat = KN_C.predict(x_test)
yhat_prob = KN_C.predict_proba(x_test)
print('Accuracy: ', "{:.4f}".format(accuracy_score(y_test, yhat)))
print('Jaccard Idx: ', "{:.4f}".format(jaccard_score(y_test, yhat)))
print('Precision: ', "{:.4f}".format(precision_score(y_test, yhat)))
print('Recall: ', "{:.4f}".format(recall_score(y_test, yhat)))
print('F1: ', "{:.4f}".format(f1_score(y_test, yhat)))
print('AUC: ', "{:.4f}".format(roc_auc_score(y_test, yhat_prob[:, 1])))
def k_nearest_neighbors_all(x, y, k_s=10):
x_train, x_test, y_train, y_test = train_test_split( x, y, test_size=0.2, random_state=4)
print('Train set:', x_train.shape, y_train.shape)
print('Test set:', x_test.shape, y_test.shape)
x_train = np.where(np.isnan(x_train), 0, x_train)
x_test = np.where(np.isnan(x_test), 0, x_test)
print('num no icing test: ', np.sum(y_test == 0))
print('num icing test: ', np.sum(y_test == 1))
k_s = 10
mean_acc = np.zeros((k_s - 1))
std_acc = np.zeros((k_s - 1))
for n in range(1, k_s):
KN_C = KNeighborsClassifier(n_neighbors=n).fit(x_train, y_train)
yhat = KN_C.predict(x_test)
yhat_prob = KN_C.predict_proba(x_test)
print('Accuracy: ', "{:.4f}".format(accuracy_score(y_test, yhat)))
print('Jaccard Idx: ', "{:.4f}".format(jaccard_score(y_test, yhat)))
print('Precision: ', "{:.4f}".format(precision_score(y_test, yhat)))
print('Recall: ', "{:.4f}".format(recall_score(y_test, yhat)))
print('F1: ', "{:.4f}".format(f1_score(y_test, yhat)))
print('AUC: ', "{:.4f}".format(roc_auc_score(y_test, yhat_prob[:, 1])))
mean_acc[n - 1] = accuracy_score(y_test, yhat)
std_acc[n - 1] = np.std(yhat == y_test) / np.sqrt(yhat.shape[0])
print("The best accuracy was with", mean_acc.max(), "with k=", mean_acc.argmax() + 1)
plt.plot(range(1, k_s), mean_acc, 'g')
plt.fill_between(range(1, k_s), mean_acc - 1 * std_acc, mean_acc + 1 * std_acc, alpha=0.10)
plt.fill_between(range(1, k_s), mean_acc - 3 * std_acc, mean_acc + 3 * std_acc, alpha=0.10, color="green")
plt.legend(('Accuracy ', '+/- 1xstd', '+/- 3xstd'))
plt.ylabel('Accuracy ')
plt.xlabel('Number of Neighbors (K)')
plt.tight_layout()
plt.show()
def decision_tree(x, y, criterion='entropy', max_depth=4):
x_train, x_test, y_train, y_test = train_test_split( x, y, test_size=0.2, random_state=4)
print('Train set:', x_train.shape, y_train.shape)
print('Test set:', x_test.shape, y_test.shape)
x_train = np.where(np.isnan(x_train), 0, x_train)
x_test = np.where(np.isnan(x_test), 0, x_test)
print('num no icing test: ', np.sum(y_test == 0))
print('num icing test: ', np.sum(y_test == 1))
DT = DecisionTreeClassifier(criterion=criterion, max_depth=max_depth).fit(x_train, y_train)
yhat = DT.predict(x_test)
yhat_prob = DT.predict_proba(x_test)
print('Accuracy: ', "{:.4f}".format(accuracy_score(y_test, yhat)))
print('Jaccard Idx: ', "{:.4f}".format(jaccard_score(y_test, yhat)))
print('Precision: ', "{:.4f}".format(precision_score(y_test, yhat)))
print('Recall: ', "{:.4f}".format(recall_score(y_test, yhat)))
print('F1: ', "{:.4f}".format(f1_score(y_test, yhat)))
print('AUC: ', "{:.4f}".format(roc_auc_score(y_test, yhat_prob[:, 1])))
return DT
# export_graphviz(DT, out_file='tree.dot', filled=True, feature_names=['cld_geo_thick', 'cld_temp_acha', 'conv_cloud_fraction', 'supercooled_cloud_fraction', 'cld_reff_dcomp', 'cld_opd_dcomp', 'iwc_dcomp'])
# !dot -Tpng tree.dot -o tree.png
def SVM(x, y, kernel='rbf'):
x_train, x_test, y_train, y_test = train_test_split( x, y, test_size=0.2, random_state=4)
print('Train set:', x_train.shape, y_train.shape)
print('Test set:', x_test.shape, y_test.shape)
x_train = np.where(np.isnan(x_train), 0, x_train)
x_test = np.where(np.isnan(x_test), 0, x_test)
print('num no icing test: ', np.sum(y_test == 0))
print('num icing test: ', np.sum(y_test == 1))
clf = svm.SVC(kernel=kernel)
clf = clf.fit(x_train, y_train)
yhat = clf.predict(x_test)
print('Accuracy: ', "{:.4f}".format(accuracy_score(y_test, yhat)))
print('Jaccard Idx: ', "{:.4f}".format(jaccard_score(y_test, yhat)))
print('Precision: ', "{:.4f}".format(precision_score(y_test, yhat)))
print('Recall: ', "{:.4f}".format(recall_score(y_test, yhat)))
print('F1: ', "{:.4f}".format(f1_score(y_test, yhat)))
def random_forest(x, y, x_test=None, y_test=None, criterion='entropy', max_depth=4):
if x_test is None:
x_train, x_test, y_train, y_test = train_test_split( x, y, test_size=0.2, random_state=4)
else:
x_train = x
y_train = y
print('Train set:', x_train.shape, y_train.shape)
print('Test set:', x_test.shape, y_test.shape)
x_train = np.where(np.isnan(x_train), 0, x_train)
x_test = np.where(np.isnan(x_test), 0, x_test)
print('num no icing test: ', np.sum(y_test == 0))
print('num icing test: ', np.sum(y_test == 1))
rnd_clf = RandomForestClassifier(criterion=criterion, max_depth=max_depth).fit(x_train, y_train)
yhat = rnd_clf.predict(x_test)
yhat_prob = rnd_clf.predict_proba(x_test)
print('Accuracy: ', "{:.4f}".format(accuracy_score(y_test, yhat)))
print('Jaccard Idx: ', "{:.4f}".format(jaccard_score(y_test, yhat)))
print('Precision: ', "{:.4f}".format(precision_score(y_test, yhat)))
print('Recall: ', "{:.4f}".format(recall_score(y_test, yhat)))
print('F1: ', "{:.4f}".format(f1_score(y_test, yhat)))
print('AUC: ', "{:.4f}".format(roc_auc_score(y_test, yhat_prob[:, 1])))