【技术分享】从一道题浅谈污点分析
#前言
之前在强网杯做过一道popmaster,在打SCTF又遇到这种类型的题目,简单总结一下这类题目的思路和一些暴力取巧的做法。
#污点分析概念
举一个简单的例子
eval(md5($_POST['a']));
在这个例子中,显然我们一眼就可以看出来
如果使用传统的正则拦截/eval\(.+\$_(POST|GET)\[.+\].+\)/,肯定是会造成误报的
然而从程序执行流的角度分析
我们的恶意数据从$_POST['a']输入,经过md5函数的处理,到达eval函数
如果没有经过md5处理,那么我们认为这是一个webshell,如果经过md5处理,那么我们认为这个eval是无效的
在这个例子中我们就引入了污点分析
污点分析可以抽象成一个三元组的形式,其中,source 即污点源,代表直接引入不受信任的数据或者机密数据到系统中;sink即污点汇聚点,代表直接产生安全敏感操作(违反数据完整性)或者泄露隐私数据到外界(违反数据保密性);sanitizer即无害处理,代表通过数据加密或者移除危害操作等手段使数据传播不再对软件系统的信息安全产生危害.污点分析就是分析程序中由污点源引入的数据是否能够不经无害处理,而直接传播到污点汇聚点.如果不能,说明系统是信息流安全的;否则,说明系统产生了隐私数据泄露或危险数据操作等安全问题.
其中$_POST['a']为source,eval()为sink,md5就是无害化处理的sanitizer
对于php反序列化漏洞这种链式漏洞来说,污点分析的效果是很好的,比如可以把__destruct/__toString作为sink,类的各种方法作为跳板,最后执行到某个类的危险方法完成利用。SCTF的 FUMO_on_the_Christmas_tree就是一个例子
#题目分析
为了方便以后有更高级的方法复现,这里留了个档
https://gist.github.com/EkiXu/3d29315909dc05b6fa71bc672f44bd1b
题目给的类很多,但是因为是随机生成的,肯定是在一定的模板基础上进行变换。根据观察我们可以归纳出几种类
source
class IuAzWYx { public object $KmORdvBb; public function __destruct() { if (is_callable([$this->KmORdvBb, 'V32HUHODH'])) @$this->KmORdvBb->V32HUHODH($_GET['uGvREuF']); }}
有__destruct魔术方法,肯定是我们的入口类也是就source了
中间类1
class dSvm6INp { public object $NxxDUWG; public function __call($name,$value) { extract([$name => 'LuRyao']); if (is_callable([$this->NxxDUWG, $iYRFwB9SQE])) call_user_func([$this->NxxDUWG, $iYRFwB9SQE], ...$value); }}
对于反序列化只关注 method 能到达的分支
class eh14yT { public object $QNgdAf0PIi; public object $KS1V49; public function WPcnd8eIc4($yu4o2GBgF) { @$yu4o2GBgF = base64_decode($yu4o2GBgF); if (is_callable([$this->QNgdAf0PIi, 'MoogQTgtd'])) @$this->QNgdAf0PIi->MoogQTgtd($yu4o2GBgF); if (is_callable([$this->KS1V49, 'xnsgfXPVM'])) @$this->KS1V49->xnsgfXPVM($yu4o2GBgF); }}
对于这个类的WPcnd8eIc4方法我们发现能到达 MoogQTgtd 方法和xnsgfXPVM 方法
因为每个方法名都是唯一的,我们只需要关注所有的function 语法树
可以抽象出节点
Node: parent: lastNode children: - xnsgfXPVM - MoogQTgtd funcName: WPcnd8eIc4 className: eh14yT fromObj: xxx operation: xxx
中间类2
class dSvm6INp { public object $NxxDUWG; public function __call($name,$value) { extract([$name => 'LuRyao']); if (is_callable([$this->NxxDUWG, $iYRFwB9SQE])) call_user_func([$this->NxxDUWG, $iYRFwB9SQE], ...$value); }}
对于他的调用方式一定是 xxx->iYRFwB9SQE() 然后转到 $this->NxxDUWG->LuRyao() 那么实际上__call = iYRFwB9SQE
因此可以同样抽象成为
Node: parent: lastNode children: - LuRyao funcName: iYRFwB9SQE className: dSvm6INp fromObj:xxx
sink
class xyRFyIcPw2 { public object $wdXqOvQVN; public function IvQQWGgFBF($SQuW0dv9yk) { if(stripos($SQuW0dv9yk, "/fumo") === 0) readfile(strtolower($SQuW0dv9yk)); }}
这种类涉及到readfile这种危险操作,可以把他作为sink
#“土法”分析
为了更好的分析程序执行流,这里直接使用phpPaser来生成ast树
require_once "vendor/autoload.php";use PhpParser\Error;use PhpParser\NodeDumper;use PhpParser\ParserFactory; $code = file_get_contents($argv[1]); $parser = (new ParserFactory)->create(ParserFactory::PREFER_PHP7);try { $ast = $parser->parse($code);} catch (Error $error) { echo "Parse error: {$error->getMessage()}"; return;} echo json_encode($ast, JSON_PRETTY_PRINT), "";
其实phpParser也支持ast vistor,但是我对python比较熟悉,接下来所以导出语法树json给python使用
然后我们的思想也很土法暴力,就是从source开始,匹配所有可能的children,进行bfs,最后当到达sink时,停止查找并输出exp
同上面我们抽象的得到的构造搜索的状态节点如下
class Node: def __init__(self,parent:"Node",funcName:str,className:str,fromObj:str,operation:str="") -> None: self.parent:"Node" = parent self.funcname = funcName self.className = className self.children:Set[Node] = {} self.fromObj = fromObj self.operation = operation def __repr__(self) -> str: return f"Class:{self.className} Func:{self.funcname}"
不过这里的children其实没用上,用parent就能描述这棵搜索树了
然后就是bfs 加上了几种剪枝方法,这部分改写自popmaster。其实这种类型的题目都是通用的,因为我们的方法比较暴力,实际上并没有执行流,而是通过分析ast结构匹配不同的语句,所以遇到不同的模板要针对性的剪枝,当然也可以根据不想要的方法名进行剪枝,以及避免循环加一个访问过的标记剪枝。
while not q.empty(): cur:Node = q.get() #print(cur) #循环剪枝 if hasattr(visit_function,cur.className+"::"+cur.funcname): continue else: visit_function[cur.className+"::"+cur.funcname] = True #黑名单剪枝 if cur.funcname in ban_function: continue if cur.funcname in know_children.keys(): for fromObj,child in know_children[cur.funcname]: q.put(Node(cur,child,f2cTable[child],fromObj)) continue cur_function_ast = functions_ast[cur.funcname] nxt_function = '' if len(cur_function_ast['params']) > 0: cur_param_name = cur_function_ast['params'][0]['var']['name'] '''对于 public function RXSpM5x($gEhgFIW) { @$gEhgFIW = str_rot13($gEhgFIW); if (is_callable([$this->vV0AU2p, 'OUIxcaUx'])) @$this->vV0AU2p->OUIxcaUx($gEhgFIW); } ''' #遍历当前函数方法 for stmt in cur_function_ast['stmts']: # xxx; if stmt['nodeType'] == 'Stmt_Expression': expr = stmt['expr'] if expr['nodeType'] == 'Expr_ErrorSuppress': expr = stmt['expr']['expr'] if expr['nodeType'] == "Expr_Assign" and expr['var']['name'] == cur_param_name: #@$Cz9slGovKv = md5($Cz9slGovKv); 剪枝 if expr['expr']['nodeType'] == 'Expr_FuncCall': iexpr = expr['expr'] if get_funccall_name(iexpr) in ['md5','sha1','crypt','base64_encode']: break else: cur.operation += get_funccall_name(iexpr) # @$DP1 = $WE; 剪枝 elif expr['expr']['nodeType'] == 'Expr_Variable' and expr['expr']['name'] != cur_param_name: break # 对于if (is_callable([$this->WShwfIFWB, 'GoPpYf35GF'])) @$this->WShwfIFWB->GoPpYf35GF($Cz9slGovKv); elif stmt['nodeType'] == 'Stmt_If': if stmt['cond']['nodeType'] == "Expr_FuncCall": if get_funccall_name(stmt['cond']) == 'is_callable': for istmt in stmt['stmts']: if istmt['nodeType'] == "Stmt_Expression": param_name = istmt['expr']['expr']['var']['name']['name'] nxt_function_name = istmt['expr']['expr']['name']['name'] #print(param_name) q.put(Node(cur,nxt_function_name,f2cTable[nxt_function_name],param_name)) #到达 if(stripos($ZVuRXoI, "/fumo") === 0) readfile(strtolower($ZVuRXoI)); 也即sink elif stmt['cond']['nodeType'] == 'Expr_BinaryOp_Identical': left = stmt['cond']['left'] if get_funccall_name(left) == 'stripos': #Ok print("Ok") printGraph(cur) printExp(cur) exit(0) #print(curState.className,'->',f2cTable[next_f]) #exit(0)
可以看到这里还有个known_children集,是因为对_call函数处理的方法比较特殊
for name,ast in classes_ast.items(): for f in ast["stmts"]: if f['nodeType'] == "Stmt_ClassMethod": class_method_name = f['name']['name'] children_method_name = None fromObj = None if class_method_name == '__call': # extract([$name => 'nglEtPMT']); for stmt in f['stmts']: if stmt['nodeType'] == 'Stmt_Expression': expr = stmt['expr'] if expr['nodeType'] == 'Expr_FuncCall' and expr['name']['parts'][0]=='extract': children_method_name = expr['args'][0]['value']['items'][0]['value']['value'] fromObj = ast['stmts'][0]['props'][0]['name']['name'] elif stmt['nodeType'] == 'Stmt_If' and get_funccall_name(stmt['cond']) == 'is_callable': expr = stmt['cond'] class_method_name = expr['args'][0]['value']['items'][1]['value']['name'] break if fromObj!=None and children_method_name!=None: know_children[class_method_name] = [(fromObj,children_method_name)] f2cTable[class_method_name] = name functions_ast[class_method_name] = f
还是刚才这个例子
class dSvm6INp { public object $NxxDUWG; public function __call($name,$value) { extract([$name => 'LuRyao']); if (is_callable([$this->NxxDUWG, $iYRFwB9SQE])) call_user_func([$this->NxxDUWG, $iYRFwB9SQE], ...$value); }}
对于我们直接在语法树中取LuRyao为下个节点的方法,并将这个__call等价为iYRFwB9SQE,直接计算其children
为了方便通过函数名反查类名,以及自动打印exp,我们还需要一些其他工具
完整脚本如下
import jsonimport queuefrom typing import Set root_ast = json.load(open('res.json','rb')) class_ast = root_ast[0]["stmts"] classes_ast = {} def get_funccall_name(expr): if expr['nodeType'] == "Expr_FuncCall": return expr['name']['parts'][0] print(expr) raise Exception("Not FunCall") class Node: def __init__(self,parent:"Node",funcName:str,className:str,fromObj:str,operation:str="") -> None: self.parent:"Node" = parent self.funcname = funcName self.className = className self.children:Set[Node] = {} self.fromObj = fromObj self.operation = operation def __repr__(self) -> str: return f"Class:{self.className} Func:{self.funcname}" def gen_decode_exp(encode_way): if encode_way == 'base64_decode': return 'base64_encode' elif encode_way == 'strrev': return 'strrev' elif encode_way == 'str_rot13': return 'str_rot13' elif encode_way == 'base64_encode': return 'base64_decode' elif encode_way == 'ucfirst': return 'lcfirst' else: print("Unkown Encode Way",encode_way) return "" def printGraph(sink:Node)->bool: cur = sink decode_exp = '"/flag"' while True: print(f"{cur.className}::{cur.funcname} [{cur.operation}]<-",end='') cur = cur.parent if cur == None: break if cur.operation != "": decode_exp = f"{gen_decode_exp(cur.operation)}({decode_exp})" print('') print(decode_exp) def printExp(cur:Node): ret = f"$my{cur.className} = new \christmasTree\{cur.className};\r" for stmt in classes_ast[cur.className]['stmts']: if stmt['nodeType'] == 'Stmt_Property': propName = stmt['props'][0]['name']['name'] ret += f"$my{cur.className}->{propName} = new StdClass;\r" parent:Node = cur.parent while parent != None: tmp = f"$my{parent.className} = new \christmasTree\{parent.className};\r" for stmt in classes_ast[parent.className]['stmts']: if stmt['nodeType'] == 'Stmt_Property': propName = stmt['props'][0]['name']['name'] tmp += f"$my{parent.className}->{propName} = new StdClass;\r" tmp += f"$my{parent.className}->{cur.fromObj} = $my{cur.className};\r" cur = parent parent = parent.parent ret +=tmp print(ret) for i in range(len(class_ast)): classes_ast[class_ast[i]['name']['name']] = class_ast[i] #根据f反查classf2cTable = {}#ban_function = ['YkN508','frZNkk','N3R1ow','N3R1ow','CCO4GN','LCG9b8','ILnKZX'] ban_function = [] know_children = {} functions_ast = {} for name,ast in classes_ast.items(): for f in ast["stmts"]: if f['nodeType'] == "Stmt_ClassMethod": class_method_name = f['name']['name'] children_method_name = None fromObj = None if class_method_name == '__call': # extract([$name => 'nglEtPMT']); for stmt in f['stmts']: if stmt['nodeType'] == 'Stmt_Expression': expr = stmt['expr'] if expr['nodeType'] == 'Expr_FuncCall' and expr['name']['parts'][0]=='extract': children_method_name = expr['args'][0]['value']['items'][0]['value']['value'] fromObj = ast['stmts'][0]['props'][0]['name']['name'] elif stmt['nodeType'] == 'Stmt_If' and get_funccall_name(stmt['cond']) == 'is_callable': expr = stmt['cond'] class_method_name = expr['args'][0]['value']['items'][1]['value']['name'] break if fromObj!=None and children_method_name!=None: know_children[class_method_name] = [(fromObj,children_method_name)] f2cTable[class_method_name] = name functions_ast[class_method_name] = f #print(f2cTable)#print(know_children)#exit(0) visit_function = {} q = queue.SimpleQueue() initState = Node(None,"__destruct",f2cTable['__destruct'],None) q.put(initState) while not q.empty(): cur:Node = q.get() #print(cur) #循环剪枝 if hasattr(visit_function,cur.className+"::"+cur.funcname): continue else: visit_function[cur.className+"::"+cur.funcname] = True #黑名单剪枝 if cur.funcname in ban_function: continue if cur.funcname in know_children.keys(): for fromObj,child in know_children[cur.funcname]: q.put(Node(cur,child,f2cTable[child],fromObj)) continue cur_function_ast = functions_ast[cur.funcname] nxt_function = '' if len(cur_function_ast['params']) > 0: cur_param_name = cur_function_ast['params'][0]['var']['name'] ''' public function RXSpM5x($gEhgFIW) { @$gEhgFIW = str_rot13($gEhgFIW); if (is_callable([$this->vV0AU2p, 'OUIxcaUx'])) @$this->vV0AU2p->OUIxcaUx($gEhgFIW); } ''' #遍历当前函数方法 for stmt in cur_function_ast['stmts']: # xxx; if stmt['nodeType'] == 'Stmt_Expression': expr = stmt['expr'] if expr['nodeType'] == 'Expr_ErrorSuppress': expr = stmt['expr']['expr'] if expr['nodeType'] == "Expr_Assign" and expr['var']['name'] == cur_param_name: #@$Cz9slGovKv = md5($Cz9slGovKv); if expr['expr']['nodeType'] == 'Expr_FuncCall': iexpr = expr['expr'] if get_funccall_name(iexpr) in ['md5','sha1','crypt','base64_encode']: break else: cur.operation += get_funccall_name(iexpr) # @$DP1 = $WE; elif expr['expr']['nodeType'] == 'Expr_Variable' and expr['expr']['name'] != cur_param_name: break # if (is_callable([$this->WShwfIFWB, 'GoPpYf35GF'])) @$this->WShwfIFWB->GoPpYf35GF($Cz9slGovKv); elif stmt['nodeType'] == 'Stmt_If': if stmt['cond']['nodeType'] == "Expr_FuncCall": if get_funccall_name(stmt['cond']) == 'is_callable': for istmt in stmt['stmts']: if istmt['nodeType'] == "Stmt_Expression": param_name = istmt['expr']['expr']['var']['name']['name'] nxt_function_name = istmt['expr']['expr']['name']['name'] #print(param_name) q.put(Node(cur,nxt_function_name,f2cTable[nxt_function_name],param_name)) # if(stripos($ZVuRXoI, "/fumo") === 0) readfile(strtolower($ZVuRXoI)); elif stmt['cond']['nodeType'] == 'Expr_BinaryOp_Identical': left = stmt['cond']['left'] if get_funccall_name(left) == 'stripos': #Ok print("Ok") printGraph(cur) printExp(cur) exit(0) #print(curState.className,'->',f2cTable[next_f]) #exit(0)
效果如下
后话
对于这类污点分析,codeql是比较好用的工具,但是受限于php的灵活性(比如在这个例子中比较搞的extract和_call魔术方法),codeql并没有支持php。因此在比赛中使用了比较暴力的方法进行分析。希望本篇文章抛砖引玉,更够学习到大佬们更高妙的方法。
参考资料
简单理解污点分析技术:https://www.k0rz3n.com/2019/03/01/%E7%AE%80%E5%8D%95%E7%90%86%E8%A7%A3%E6%B1%A1%E7%82%B9%E5%88
