RETOMOS/retomos/retomos_malware_classifier.py

380 lines
18 KiB
Python

#####################################################
# Title: HTML parse- and analyser
# Author: Jesper Bergman (jesperbe@dsv.su.se)
# Licence: GPLv2
#####################################################
#!/usr/bin/python
import sys
import sqlite3
import datetime
import timeit
import math
import re
import pandas as pd
import numpy as np
from time import time, sleep
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, cross_validate, train_test_split
#from sklearn.naive_bayes import *
from sklearn.metrics import roc_auc_score, balanced_accuracy_score, precision_recall_curve, classification_report, precision_recall_fscore_support, roc_curve, average_precision_score, auc, confusion_matrix
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.feature_selection import SelectKBest, chi2, VarianceThreshold
from sklearn.tree import DecisionTreeClassifier, export_text, export_graphviz
from sklearn.linear_model import LogisticRegression
from sklearn import svm
from sklearn import tree
from mglearn import make_blobs
import matplotlib.pyplot as plt
import graphviz
'''
OPEN UP DATABASE AND FETCH DATA
'''
def connect_to_database(action, training_db, urls, unknown_samples, sha256):
# Open up training data set
training_db_connection = ""
training_db_cursor = ""
clfnb = MultinomialNB()
clfrf = RandomForestClassifier(random_state=0)
if action == False:
try:
# Connect to training set database
training_db_connection = sqlite3.connect(str(training_db))
training_db_cursor = training_db_connection.cursor()
# Queries for retrieving data to analyse
sql_reg_keys_query = "SELECT sha256, path FROM reg_keys;"
sql_strings_query = "SELECT strings FROM strings;"
training_db_cursor.execute(sql_reg_keys_query)
reg_key_pairs = training_db_cursor.fetchall()
reg_keys_dict = {}
unknown_samples_dict = {}
cur_sha = ""
cur_class_label = 3
class_label=0
reg_keys_list = []
dns_list = []
api_list = []
dll_list = []
tor_related = int(0)
api_string = ""
reg_keys_string = ""
dns_string =""
counter = 0
counter_length = len(reg_key_pairs)
reg_keys_combined = {}
unknown_samples_combined = {}
print("Fetching data from database. Processing.")
for pair in reg_key_pairs:
counter += 1
# Print progress
if counter % 100 == 0:
sys.stdout.write(".")
sys.stdout.flush()
if counter == (math.ceil(0.1 * counter_length)):
print("10%")
if counter == (math.ceil(0.2* counter_length)):
print("20%")
if counter == (math.ceil(0.5 * counter_length)):
print("50%")
if counter == (math.ceil(0.7 * counter_length)):
print("70%")
if counter == (math.ceil(0.8 * counter_length)):
print("80%")
if counter == (math.ceil(0.9 * counter_length)):
print("90%")
if counter == (math.ceil(0.95 * counter_length)):
print("95%")
if cur_sha != pair[0]:
cur_sha = pair[0]
reg_keys_list = []
api_list = []
dll_list = []
api_string = ""
dll_string = ""
dns_string = ""
reg_keys_string = ""
class_label =[]
else:
reg_keys_list.append(pair[1])
dns_query = "SELECT dns FROM network WHERE sha256=\'" + cur_sha + "\';"
training_db_cursor.execute(dns_query)
dns_list = training_db_cursor.fetchall()
api_query = "SELECT name,tor_related FROM api_calls WHERE sha256=\'" + cur_sha + "\';"
training_db_cursor.execute(api_query)
api_list = training_db_cursor.fetchall()
dll_query = "SELECT name FROM dlls WHERE sha256=\'" + cur_sha + "\';"
training_db_cursor.execute(dll_query)
dll_list = training_db_cursor.fetchall()
class_query = "SELECT tor_related FROM label WHERE sha256=\'" + cur_sha + "\';"
training_db_cursor.execute(class_query)
class_label = training_db_cursor.fetchall()
# Append data from database
api_string = "".join(str(api_list))
reg_keys_string = "".join(str(reg_keys_list))
dns_string = "".join(str(dns_list))
dll_string = "".join(str(dll_list))
# If 1 or 0, samples are correctly classified. 2 are prediction candidates.
if class_label:
if 0 in class_label[0]:
tor_related = int(0)
reg_keys_dict.update({cur_sha : [reg_keys_string, dns_string, dll_string, api_string, tor_related]})
reg_keys_combined.update({cur_sha : [reg_keys_string + " " + dns_string + " " + dll_string + " " + api_string, tor_related]})
if 1 in class_label[0]:
tor_related = int(1)
reg_keys_dict.update({cur_sha : [reg_keys_string, dns_string, dll_string, api_string, tor_related]})
reg_keys_combined.update({cur_sha : [reg_keys_string + " " + dns_string + " " + dll_string + " " + api_string, tor_related]})
if 2 in class_label[0]:
tor_related = int(2)
unknown_samples_dict.update({cur_sha : [reg_keys_string, dns_string, dll_string, api_string, tor_related]})
unknown_samples_combined.update({cur_sha : [reg_keys_string + " " + dns_string + dll_string + " " + api_string, tor_related]})
# Construct data frames from the feature dictionaries
training_df2 = pd.DataFrame(reg_keys_dict).T
training_df3 = pd.DataFrame(reg_keys_combined).T
# Construct a data frame for the unknown sample to be classified as well
unknown_df2 = pd.DataFrame(unknown_samples_dict).T
unknown_df3 = pd.DataFrame(unknown_samples_combined).T
# predictions_SHA256_list = build_classifiers(training_df2, training_df3, unknown_df2, unknown_df3)
predictions_SHA256_list = build_classifiers(training_df2, training_df3, unknown_df2, unknown_df3)
# If URLs flag enabled, go fetch URLs
if urls == True:
unique_onion_urls = []
print("|-- Tor Malware\n", predictions_SHA256_list)
for prediction_SHA256 in predictions_SHA256_list:
strings_query = "SELECT strings FROM strings WHERE sha256=\'" + prediction_SHA256 + "\';"
dns_query = "SELECT dns FROM network WHERE sha256=\'" + prediction_SHA256 + "\';"
training_db_cursor.execute(strings_query)
predicted_strings = training_db_cursor.fetchall()
# Find .onion URL
for onion_url in predicted_strings:
for string in onion_url:
tmp_list = re.findall("http[s]?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+", string)
#tmp_list = re.findall("(\w+)://([\w\-\.]+)/(\w+).(\w+)", string)
#tmp_list = re.findall(r"(?<=\.)([^.]+)(?:\.(?:onion|[^.]+(?:$|\n)))", string)
for i in tmp_list:
if i not in unique_onion_urls:
unique_onion_urls.append(i)
print("|--- Onion URLs \n", unique_onion_urls)
# Close DB connection
training_db_connection.commit()
training_db_connection.close()
except sqlite3.Error as err:
print("Sqlite error:", err)
finally:
training_db_connection.close()
"""
BUILD CLASSIFICATION MODELS
"""
def build_classifiers(df2, df3, unknown_df2, unknown_df3):
# Create bag of words for label:
vect = CountVectorizer(lowercase=False)
vect.fit_transform(df3[0])
X = vect.transform(df3[0])
# If there are unknown samples, make predictions on them.
X_unknown = vect.transform(unknown_df3[0])
# unknown_samples_SHA256 = df3[0].index
#X = pd.DataFrame(X_cand, columns=vect.get_feature_names())
# Target/class labels
y = df2[4]
y = y.astype('int')
# Feature selection
selector = VarianceThreshold(threshold=12)
selector.fit_transform(X)
# 80 / 20 split training and testing data. Shuffle just in case.
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True, test_size=0.2)
y_train = y_train.astype('int')
y_test = y_test.astype('int')
# Naive Bayes
mnb = MultinomialNB()
nb_clf = mnb.fit(X_train.toarray(), y_train.to_numpy())
mnb_prediction = nb_clf.predict(X_test.toarray())
mnb_proba = nb_clf.predict_proba(X_test)[:, 1]
mnb_cross_validation_scores = cross_validate(nb_clf, X_test.toarray(), y_test.to_numpy(), cv=5, scoring=["accuracy", "f1", "recall", "precision", "roc_auc"], n_jobs=-1, return_train_score=True)
mnb_cross_validation_score = cross_val_score(nb_clf, X_test.toarray(), y_test.to_numpy(), cv=5, scoring="accuracy")
mnb_roc_auc_avg = roc_auc_score(y_test, mnb_prediction)
mnb_balanced_accuracy = balanced_accuracy_score(y_test, mnb_prediction)
mnb_precision, mnb_recall, mnb_threshold = precision_recall_curve(y_test, nb_clf.predict(X_test.toarray()))
mnb_fpr = dict()
mnb_tpr = dict()
mnb_roc_auc = dict()
mnb_fpr[0], mnb_tpr[0], _ = roc_curve(y_test, mnb_proba)
mnb_roc_auc[0] = auc(mnb_fpr[0], mnb_tpr[0])
# Compute micro-average ROC curve and ROC area
mnb_fpr["micro"], mnb_tpr["micro"], _ = roc_curve(y_test.ravel(), mnb_proba.ravel())
mnb_roc_auc["micro"] = auc(mnb_fpr["micro"], mnb_tpr["micro"])
print("\n | ---- MNB cross validation score: ", mnb_cross_validation_score.mean())
print(classification_report(y_test, mnb_prediction))
# Support Vector Machine
clf = svm.SVC(C=2, cache_size=9000, probability=True).fit(X_train, y_train)
svm_proba = clf.predict_proba(X_test)[:, 1]
svm_prediction = clf.predict(X_test)
svm_unknown_sample_predicition = clf.predict(X_unknown)
svm_y_score = clf.decision_function(X_test)
svm_roc_auc_avg = roc_auc_score(y_test, svm_prediction)
svm_cross_validation_scores = cross_validate(clf, X_test, y_test, cv=5, scoring=["accuracy", "balanced_accuracy","precision","f1","recall","roc_auc"], return_train_score=True)
svm_cross_validation_score = cross_val_score(clf, X_test, y_test, cv=5, scoring="accuracy")
svm_precision, svm_recall, svm_threshold = precision_recall_curve(y_test, clf.decision_function(X_test))
svm_close_zero = np.argmin(np.abs(svm_threshold))
svm_fpr = dict()
svm_tpr = dict()
svm_roc_auc = dict()
#svm_fpr[0], svm_tpr[0], _ = roc_curve(y_test, svm_prediction)
svm_fpr[0], svm_tpr[0], _ = roc_curve(y_test, svm_proba)
#svm_fpr[1], svm_tpr[1], _ = roc_curve(y_test[:,1], svm_y_score[:, 1])
svm_roc_auc[0] = auc(svm_fpr[0], svm_tpr[0])
# Compute micro-average ROC curve and ROC area
svm_fpr["micro"], svm_tpr["micro"], _ = roc_curve(y_test.ravel(), svm_proba.ravel())
svm_roc_auc["micro"] = auc(svm_fpr["micro"], svm_tpr["micro"])
print("\n\n|---- SVM 10-fold cross validation accuracy score:{}".format(np.mean(svm_cross_validation_score)))
# Logistic regression classifier
logreg = LogisticRegression(max_iter=4000).fit(X_train, y_train)
lr_prediction = logreg.predict(X_test)
lr_unknown_predictions = logreg.predict(X_unknown)
lr_proba = logreg.predict_proba(X_test)[:, 1]
lr_decision_function = logreg.decision_function(X_test)
lr_cross_validation_scores = cross_validate(logreg, X_test, y_test, cv=5 , scoring=["accuracy", "balanced_accuracy", "precision", "f1", "recall","roc_auc"], n_jobs=-1, return_train_score=True)
lr_cross_validation_score = cross_val_score(logreg, X_test, y_test, cv=5 , scoring="accuracy")
lr_roc_auc = roc_auc_score(y_test, lr_prediction)
lr_fpr = dict()
lr_tpr = dict()
lr_roc_auc = dict()
lr_fpr[0], lr_tpr[0], _ = roc_curve(y_test, lr_proba)
lr_roc_auc[0] = auc(lr_fpr[0], lr_tpr[0])
lr_fpr["micro"], lr_tpr["micro"], _ = roc_curve(y_test.ravel(), lr_proba.ravel())
lr_roc_auc["micro"] = auc(lr_fpr["micro"], lr_tpr["micro"])
average_precision = average_precision_score(y_test, lr_decision_function)
precision, recall, threshold = precision_recall_curve(y_test, lr_decision_function)
precision1, recall1, f1, supp = precision_recall_fscore_support(y_test, lr_prediction, average="weighted", zero_division=1)
print("\n\n|---- LR 10-fold cross validation accuracy score:{}".format(np.mean(lr_cross_validation_score)))
print(classification_report(y_test, lr_prediction, zero_division=1))
# Random forest classifier
rf_clf = RandomForestClassifier(max_depth=2, random_state=0)
rf_clf.fit(X_train, y_train)
rf_prediction = rf_clf.predict(X_test)
rf_unknown_prediction = rf_clf.predict(X_unknown)
rf_proba = rf_clf.predict_proba(X_test)[:, 1]
rf_fpr = dict()
rf_tpr = dict()
rf_roc_auc = dict()
rf_fpr[0], rf_tpr[0], _ = roc_curve(y_test, rf_prediction)
rf_roc_auc[0] = auc(rf_fpr[0], rf_tpr[0])
rf_fpr["micro"], rf_tpr["micro"], _ = roc_curve(y_test.ravel(), rf_prediction.ravel())
rf_roc_auc["micro"] = auc(rf_fpr["micro"], rf_tpr["micro"])
rf_precision, rf_recall, rf_threshold = precision_recall_curve(y_test, rf_prediction)
rf_cross_validation_score = cross_val_score(rf_clf, X_test, y_test, cv=5 , scoring="accuracy")
print("\n\n|---- RF 10-fold cross validation accuracy score: {}", rf_cross_validation_score.mean())
print(classification_report(y_test,rf_prediction))
# Decision tree classifier
dt_clf = DecisionTreeClassifier()
dt_clf.fit(X_train, y_train)
dt_prediction = dt_clf.predict(X_test)
dt_unknown_prediction = dt_clf.predict(X_unknown)
dt_proba = dt_clf.predict_proba(X_test)[:, 1]
dt_fpr = dict()
dt_tpr = dict()
dt_roc_auc = dict()
dt_fpr[0], dt_tpr[0], _ = roc_curve(y_test, dt_prediction)
dt_roc_auc[0] = auc(dt_fpr[0], dt_tpr[0])
dt_fpr["micro"], dt_tpr["micro"], _ = roc_curve(y_test.ravel(), dt_prediction.ravel())
dt_roc_auc["micro"] = auc(dt_fpr["micro"], dt_tpr["micro"])
dt_precision, dt_recall, dt_threshold = precision_recall_curve(y_test, dt_prediction)
dt_cross_validation_score = cross_val_score(dt_clf, X_test, y_test, cv=5 , scoring="accuracy")
print("\n\n|---- DT 10-fold cross validation accuracy score:{} ", dt_cross_validation_score.mean())
print("\nDT score: ", dt_clf.score(X_test, y_test), "\nDT classification report\n\n", classification_report(y_test, dt_prediction), export_text(dt_clf, show_weights=True))
print("DT y_predictions: ", dt_prediction, "y_test: ", y_test)
# Verify predictions with the true labels
verified_predictions_SHA256_list = verify_predictions(dt_prediction, y_test)
# Unseen samples predictions
"""
# Draw AuC RoC
roc_plt = plt
roc_plt.figure()
lw = 2
roc_plt.plot(svm_fpr[0], svm_tpr[0], color='red', lw=lw, label='Support vector machine ROC curve (area = %0.2f)' % svm_roc_auc[0])
roc_plt.plot(lr_fpr[0], lr_tpr[0], color='yellow', lw=lw, label='Logistic regression ROC curve (area = %0.2f)' % lr_roc_auc[0])
roc_plt.plot(mnb_fpr[0], mnb_tpr[0], color='green', lw=lw, label='Multinomial naive Bayes ROC curve (area = %0.2f)' % mnb_roc_auc[0])
roc_plt.plot(rf_fpr[0], rf_tpr[0], color='blue', lw=lw, label='Random Forest ROC curve (area = %0.2f)' % rf_roc_auc[0])
roc_plt.plot(dt_fpr[0], dt_tpr[0], color='purple', lw=lw, label='Decision tree ROC curve (area = %0.2f)' % dt_roc_auc[0])
roc_plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
roc_plt.xlim([0.0, 1.0])
roc_plt.ylim([0.0, 1.05])
roc_plt.xlabel('False Positive Rate')
roc_plt.ylabel('True Positive Rate')
roc_plt.title('Receiver operating characteristic.')
roc_plt.legend(loc="lower right")
roc_plt.grid(True)
#fig_file = str(datetime.datetime.now() + ".png"
roc_plt.savefig("roc.tiff", format="tiff")
# Plot precision and recall graph
plt.plot(precision, recall, label="Logistic regression")
plt.plot(svm_precision, svm_recall, label="Support vector machine")
plt.plot(mnb_precision, mnb_recall, label="Multinomial naive Bayes")
plt.plot(rf_precision, rf_recall, label="Random forest")
plt.plot(dt_precision, dt_recall, label="Decision tree")
plt.xlabel("Precision")
plt.ylabel("Recall")
plt.legend(loc="best")
fig2_file = str(datetime.datetime.now()) + ".tiff"
plt.savefig(fig2_file, format="tiff")
"""
return verified_predictions_SHA256_list
def verify_predictions(X_predictions_list, y_true):
counter = 0;
X_prediction = int(X_predictions_list[counter])
verified_predictions_SHA256_list = []
for y_index, y_value in y_true.items():
if X_prediction == y_value:
print("|--- Prediction matches the true label on file with SHA256: ", y_index)
verified_predictions_SHA256_list.append(y_index)
counter += 1
return verified_predictions_SHA256_list
# Constructor
if __name__ == "__main__":
arguments = docopt(__doc__, version='retomos 0.1')
main(arguments)