[系统安全] 四十九.恶意家族分类 (1)基于API序列和机器学习的恶意家族分类实例详解

VSole2023-06-02 09:28:55

一.恶意软件分析

恶意软件或恶意代码分析通常包括静态分析和动态分析。特征种类如果按照恶意代码是否在用户环境或仿真环境中运行,可以划分为静态特征和动态特征。

那么,如何提取恶意软件的静态特征或动态特征呢? 因此,第一部分将简要介绍静态特征和动态特征。

1.静态特征

没有真实运行的特征,通常包括:

  • 字节码
  • 二进制代码转换成了字节码,比较原始的一种特征,没有进行任何处理
  • IAT表
  • PE结构中比较重要的部分,声明了一些函数及所在位置,便于程序执行时导入,表和功能比较相关
  • Android权限表
  • 如果你的APP声明了一些功能用不到的权限,可能存在恶意目的,如手机信息
  • 可打印字符
  • 将二进制代码转换为ASCII码,进行相关统计
  • IDA反汇编跳转块
  • IDA工具调试时的跳转块,对其进行处理作为序列数据或图数据
  • 常用API函数
  • 恶意软件图像化

静态特征提取方式:

  • CAPA
  • – https://github.com/mandiant/capa
  • IDA Pro
  • 安全厂商沙箱

2.动态特征

相当于静态特征更耗时,它要真正去执行代码。通常包括:

– API调用关系:比较明显的特征,调用了哪些API,表述对应的功能

– 控制流图:软件工程中比较常用,机器学习将其表示成向量,从而进行分类

– 数据流图:软件工程中比较常用,机器学习将其表示成向量,从而进行分类

动态特征提取方式:

  • Cuckoo
  • – https://github.com/cuckoosandbox/cuckoo
  • CAPE
  • – https://github.com/kevoreilly/CAPEv2
  • – https://capev2.readthedocs.io/en/latest/
  • 安全厂商沙箱

二.基于逻辑回归的恶意家族检测

前面的系列文章详细介绍如何提取恶意软件的静态和动态特征,包括API序列。接下来将构建机器学习模型学习API序列实现分类。基本流程如下:

1.数据集

整个数据集包括5类恶意家族的样本,每个样本经过先前的CAPE工具成功提取的动态API序列。数据集分布情况如下所示:(建议读者提取自己数据集的样本,包括BIG2015、BODMAS等)

恶意家族类别数量训练集测试集AAAAclass1352242110BBBBclass2335235100CCCCclass3363243120DDDDclass4293163130EEEEclass5548358190

数据集分为训练集和测试集,如下图所示:

数据集中主要包括四个字段,即序号、恶意家族类别、Md5值、API序列或特征。

需要注意,在特征提取过程中涉及大量数据预处理和清洗的工作,读者需要结合实际需求完成。比如提取特征为空值的过滤代码。


#coding:utf-8#By:Eastmount CSDN 2023-05-31import csvimport reimport os
csv.field_size_limit(500 * 1024 * 1024)filename = "AAAA_result.csv"writename = "AAAA_result_final.csv"fw = open(writename, mode="w", newline="")writer = csv.writer(fw)writer.writerow(['no', 'type', 'md5', 'api'])with open(filename,encoding='utf-8') as fr:    reader = csv.reader(fr)    no = 1    for row in reader: #['no','type','md5','api']        tt = row[1]        md5 = row[2]        api = row[3]        #print(no,tt,md5,api)        #api空值的过滤        if api=="" or api=="api":            continue        else:            writer.writerow([str(no),tt,md5,api])            no += 1fr.close()

2.模型构建

由于机器学习算法比较简单,这里仅给出关键代码。此外,常用特征表征包括TF-IDF和Word2Vec,这里采用TF-IDF计算特征向量,读者可以尝试Word2Vec,最终实现家族分类并取得0.6215的Acc值。


# -*- coding: utf-8 -*-# By:Eastmount CSDN 2023-06-01import osimport csvimport timeimport numpy as npimport seaborn as snsfrom sklearn import metricsfrom sklearn.feature_extraction.text import CountVectorizerfrom sklearn.feature_extraction.text import TfidfTransformerfrom sklearn.preprocessing import LabelEncoder, OneHotEncoderfrom sklearn.model_selection import train_test_splitfrom sklearn.decomposition import PCAfrom sklearn.linear_model import LogisticRegressionfrom sklearn.metrics import classification_report
start = time.clock()csv.field_size_limit(500 * 1024 * 1024)
#---------------------------第一步 加载数据集------------------------#训练集file = "train_dataset.csv"label_train = []content_train = []with open(file, "r") as csv_file:    csv_reader = csv.reader(csv_file)    header = next(csv_reader)    for row in csv_reader:        label_train.append(row[1])        value = str(row[3])        content_train.append(value)print(label_train[:2])print(content_train[:2])
#测试集file = "test_dataset.csv"label_test = []content_test = []with open(file, "r") as csv_file:    csv_reader = csv.reader(csv_file)    header = next(csv_reader)    for row in csv_reader:        label_test.append(row[1])        value = str(row[3])        content_test.append(value)print(len(label_train),len(label_test))print(len(content_train),len(content_test)) #1241 650
#---------------------------第二步 向量转换------------------------contents = content_train + content_testlabels = label_train + label_test
#计算词频 min_df max_dfvectorizer = CountVectorizer()X = vectorizer.fit_transform(contents)words = vectorizer.get_feature_names()print(words[:10])print("特征词数量:",len(words))
#计算TF-IDFtransformer = TfidfTransformer()tfidf = transformer.fit_transform(X)weights = tfidf.toarray()
#---------------------------第三步 编码转换------------------------le = LabelEncoder()y = le.fit_transform(labels)X_train, X_test = weights[:1241], weights[1241:]y_train, y_test = y[:1241], y[1241:]
#---------------------------第四步 分类检测------------------------clf = LogisticRegression(solver='liblinear')clf.fit(X_train, y_train)pre = clf.predict(X_test)print(clf)print(classification_report(y_test, pre, digits=4))print("accuracy:")print(metrics.accuracy_score(y_test, pre))
#计算时间elapsed = (time.clock() - start)print("Time used:", elapsed)

输出结果如下图所示:


1241 6501241 650['__anomaly__', 'accept', 'bind', 'changewindowmessagefilter', 'closesocket', 'clsidfromprogid', 'cocreateinstance', 'cocreateinstanceex', 'cogetclassobject', 'colescript_parsescripttext']特征词数量: 269LogisticRegression(solver='liblinear')              precision    recall  f1-score   support
           0     0.5398    0.5545    0.5471       110           1     0.6526    0.6200    0.6359       100           2     0.6596    0.5167    0.5794       120           3     0.8235    0.5385    0.6512       130           4     0.5665    0.7842    0.6578       190
    accuracy                         0.6215       650   macro avg     0.6484    0.6028    0.6143       650weighted avg     0.6438    0.6215    0.6199       650
accuracy:0.6215384615384615Time used: 2.2597622

三.基于SVM的恶意家族检测

1.SVM模型

SVM分类算法的核心思想是通过建立某种核函数,将数据在高维寻找一个满足分类要求的超平面,使训练集中的点距离分类面尽可能的远,即寻找一个分类面使得其两侧的空白区域最大。如图19.16所示,两类样本中离分类面最近的点且平行于最优分类面的超平面上的训练样本就叫做支持向量。

SVM分类算法在Sklearn机器学习包中,实现的类是 svm.SVC,即C-Support Vector Classification,它是基于libsvm实现的。构造方法如下:


SVC(C=1.0,   cache_size=200,   class_weight=None,   coef0=0.0,  decision_function_shape=None,   degree=3,   gamma='auto',   kernel='rbf',  max_iter=-1,   probability=False,   random_state=None,   shrinking=True,  tol=0.001,   verbose=False)

SVC算法主要包括两个步骤:

  • 训练nbrs.fit(data, target)
  • 预测pre = clf.predict(data)

2.代码实现

下面仅给出SVM实现恶意家族分类的关键代码,该算法也是各类安全任务中的常用模型。需要注意,这里将预测结果保存至文件中,在真实实验中,建议大家多将实验过程数据保存,从而能更好地比较各种性能,体现论文的贡献。


# -*- coding: utf-8 -*-# By:Eastmount CSDN 2023-06-01import osimport csvimport timeimport numpy as npimport seaborn as snsfrom sklearn import svmfrom sklearn import metricsfrom sklearn.feature_extraction.text import CountVectorizerfrom sklearn.feature_extraction.text import TfidfTransformerfrom sklearn.preprocessing import LabelEncoder, OneHotEncoderfrom sklearn.model_selection import train_test_splitfrom sklearn.decomposition import PCAfrom sklearn.linear_model import LogisticRegressionfrom sklearn.metrics import classification_report
start = time.clock()csv.field_size_limit(500 * 1024 * 1024)
#---------------------------第一步 加载数据集------------------------#训练集file = "train_dataset.csv"label_train = []content_train = []with open(file, "r") as csv_file:    csv_reader = csv.reader(csv_file)    header = next(csv_reader)    for row in csv_reader:        label_train.append(row[1])        value = str(row[3])        content_train.append(value)print(label_train[:2])print(content_train[:2])
#测试集file = "test_dataset.csv"label_test = []content_test = []with open(file, "r") as csv_file:    csv_reader = csv.reader(csv_file)    header = next(csv_reader)    for row in csv_reader:        label_test.append(row[1])        value = str(row[3])        content_test.append(value)print(len(label_train),len(label_test))print(len(content_train),len(content_test)) #1241 650
#---------------------------第二步 向量转换------------------------contents = content_train + content_testlabels = label_train + label_test
#计算词频 min_df max_dfvectorizer = CountVectorizer()X = vectorizer.fit_transform(contents)words = vectorizer.get_feature_names()print(words[:10])print("特征词数量:",len(words))
#计算TF-IDFtransformer = TfidfTransformer()tfidf = transformer.fit_transform(X)weights = tfidf.toarray()
#---------------------------第三步 编码转换------------------------le = LabelEncoder()y = le.fit_transform(labels)X_train, X_test = weights[:1241], weights[1241:]y_train, y_test = y[:1241], y[1241:]
#---------------------------第四步 分类检测------------------------clf = svm.LinearSVC()clf.fit(X_train, y_train)pre = clf.predict(X_test)print(clf)print(classification_report(y_test, pre, digits=4))print("accuracy:")print(metrics.accuracy_score(y_test, pre))
#结果存储f1 = open("svm_test_pre.txt", "w")for n in pre:    f1.write(str(n) + "")f1.close()
f2 = open("svm_test_y.txt", "w")for n in y_test:    f2.write(str(n) + "")f2.close()
#计算时间elapsed = (time.clock() - start)print("Time used:", elapsed)

实验结果如下图所示:

1241 6501241 650
['__anomaly__', 'accept', 'bind', 'changewindowmessagefilter', 'closesocket', 'clsidfromprogid', 'cocreateinstance', 'cocreateinstanceex', 'cogetclassobject', 'colescript_parsescripttext']特征词数量: 269LinearSVC()              precision    recall  f1-score   support
           0     0.6439    0.7727    0.7025       110           1     0.8780    0.7200    0.7912       100           2     0.7315    0.6583    0.6930       120           3     0.9091    0.6154    0.7339       130           4     0.6583    0.8316    0.7349       190
    accuracy                         0.7292       650   macro avg     0.7642    0.7196    0.7311       650weighted avg     0.7534    0.7292    0.7301       650
accuracy:0.7292307692307692Time used: 2.2672032

四.基于随机森林的恶意家族检测

该部分关键代码如下,并且补充可视化分析代码。


# -*- coding: utf-8 -*-# By:Eastmount CSDN 2023-06-01import osimport csvimport timeimport numpy as npimport seaborn as snsfrom sklearn import svmfrom sklearn import metricsfrom sklearn.feature_extraction.text import CountVectorizerfrom sklearn.feature_extraction.text import TfidfTransformerfrom sklearn.preprocessing import LabelEncoder, OneHotEncoderfrom sklearn.model_selection import train_test_splitfrom sklearn.decomposition import PCAfrom sklearn.linear_model import LogisticRegressionfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.metrics import classification_reportimport matplotlib.pyplot as pltfrom matplotlib.colors import ListedColormap
start = time.clock()csv.field_size_limit(500 * 1024 * 1024)
#---------------------------第一步 加载数据集------------------------#训练集file = "train_dataset.csv"label_train = []content_train = []with open(file, "r") as csv_file:    csv_reader = csv.reader(csv_file)    header = next(csv_reader)    for row in csv_reader:        label_train.append(row[1])        value = str(row[3])        content_train.append(value)print(label_train[:2])print(content_train[:2])
#测试集file = "test_dataset.csv"label_test = []content_test = []with open(file, "r") as csv_file:    csv_reader = csv.reader(csv_file)    header = next(csv_reader)    for row in csv_reader:        label_test.append(row[1])        value = str(row[3])        content_test.append(value)print(len(label_train),len(label_test))print(len(content_train),len(content_test)) #1241 650
#---------------------------第二步 向量转换------------------------contents = content_train + content_testlabels = label_train + label_test
#计算词频 min_df max_dfvectorizer = CountVectorizer()X = vectorizer.fit_transform(contents)words = vectorizer.get_feature_names()print(words[:10])print("特征词数量:",len(words))
#计算TF-IDFtransformer = TfidfTransformer()tfidf = transformer.fit_transform(X)weights = tfidf.toarray()
#---------------------------第三步 编码转换------------------------le = LabelEncoder()y = le.fit_transform(labels)X_train, X_test = weights[:1241], weights[1241:]y_train, y_test = y[:1241], y[1241:]
#---------------------------第四步 分类检测------------------------clf = RandomForestClassifier(n_estimators=5)clf.fit(X_train, y_train)pre = clf.predict(X_test)print(clf)print(classification_report(y_test, pre, digits=4))print("accuracy:")print(metrics.accuracy_score(y_test, pre))
#结果存储f1 = open("rf_test_pre.txt", "w")for n in pre:    f1.write(str(n) + "")f1.close()
f2 = open("rf_test_y.txt", "w")for n in y_test:    f2.write(str(n) + "")f2.close()
#计算时间elapsed = (time.clock() - start)print("Time used:", elapsed)
#---------------------------第五步 可视化分析------------------------#降维pca = PCA(n_components=2)pca = pca.fit(X_test)xx = pca.transform(X_test)
#画图plt.figure()plt.scatter(xx[:,0],xx[:,1],c=y_test, s=50)plt.title("Malware Family Detection")plt.show()

输出结果如下所示,效果达到了0.8092,感觉还不错。


1241 6501241 650['__anomaly__', 'accept', 'bind', 'changewindowmessagefilter', 'closesocket', 'clsidfromprogid', 'cocreateinstance', 'cocreateinstanceex', 'cogetclassobject', 'colescript_parsescripttext']特征词数量: 269RandomForestClassifier(n_estimators=5)              precision    recall  f1-score   support
           0     0.7185    0.8818    0.7918       110           1     0.9000    0.8100    0.8526       100           2     0.7963    0.7167    0.7544       120           3     0.9444    0.7846    0.8571       130           4     0.7656    0.8421    0.8020       190
    accuracy                         0.8092       650   macro avg     0.8250    0.8070    0.8116       650weighted avg     0.8197    0.8092    0.8103       650
accuracy:0.8092307692307692Time used: 2.1914324

同时,五类恶意家族进行可视化分析。然而,整个效果一般,需要进一步优化代码和维度来区分数据集,或者三维散点图,请读者自行思考。


五.总结

写到这里这篇文章就结束,希望对您有所帮助。忙碌的五月,真的很忙,项目本子论文毕业,等忙完后好好写几篇安全博客,感谢支持和陪伴,尤其是家人的鼓励和支持, 继续加油!

机器学习系统安全
本作品采用《CC 协议》,转载必须注明作者和本文链接
Cuckoo沙箱将在几秒钟内为您提供一些详细的分析结果,概述该文件在隔离环境中执行时的情况。在2010年夏天开启该工作之后,第一个测试版于2011年2月5日发布,这是Cuckoo第一次公开发布。2011年3月,在谷歌Code Summer of 2011期间,Cuckoo再次被选为蜜网项目的支持项目,在此期间Dario Fernandes加入了该项目并扩展了其功能。恶意软件在执行过程中创建、删除和下载的文件。
Mothukuri 等人在不干涉本地训练过程的前提下,运用成员推断攻击,并通过篡改其在通信过程中传输的梯度,减弱了全局模型的效能。在这些防御措施中,本文重点对差分隐私进行介绍,分析其隐私安全性能,以期进一步减小联邦学习系统中的隐私风险。横向联邦学习按照样本空间对数据集进行划分,并取出特征相同而样本不同的部分来进行训练。这些保护措施的安全性主要体现在密码学算法的安全性,同时需考虑密码学算法的时空效率。
系统安全第31篇文章介绍恶意代码攻击溯源基础知识
AI安全论文第17篇介绍Overview撰写方式及顶会精句摘抄
RSA Conference 2023将于旧金山时间4月24日正式启幕。近日,RSA Conference正式公布RSAC 2023创新沙盒竞赛的10名决赛入围者,分别为AnChain.AI、Astrix、Dazz、Endor Labs、HiddenLayer、Pangea、Relyance AI、SafeBase、Valence Security、Zama。目前,已获得种子轮600万美元的融资。可见,机器学习的威胁攻击面暴露并被利用的真实事件,是推动HiddenLayer团队成立创业公司的核心驱动力之一。据统计,2021年全球的网络攻击估计造成了6万亿美元的损失。Security Audit Reporting,安全审计报告。
系统安全第44篇介绍经典的静态特征提取工具CAPA,希望对您有所帮助
我国工业控制基础软硬件发展滞后,核心竞争力差,是提升工控领域自主安全水平的关键症结。相关工业控制基础软硬件较大程度地依赖国外引进,尤其部分涉及国家关键基础设施建设的重要行业,核心技术仍然受制于国外公司,企业自主创新能力存在欠缺,如精密采集、精准时钟、智能算法、故障定位、中断调度等。
据美国调查新闻网站“拦截者”(The Intercept)1月12日报道,美国知名人工智能企业、ChatGPT母公司OpenAI近日悄悄修改了其产品的使用条款,删除了禁止将OpenAI技术用于军事用途的条文。
https://capev2.readthedocs.io/en/latest/安全厂商沙箱二.基于逻辑回归的恶意家族检测前面的系列文章详细介绍如何提取恶意软件的静态和动态特征,包括API序列。接下来将构建机器学习模型学习API序列实现分类。基本流程如下:1.数据集整个数据集包括5类恶意家族的样本,每个样本经过先前的CAPE工具成功提取的动态API序列。
当人工智能遇上安全第7篇文章介绍基于机器学习的安全数据集,希望对您有帮助
VSole
网络安全专家