Angr符号执行练习–对付OLLVM Control Flow Flattening/控制流平坦化
文章探讨了控制流平坦化(CFF)技术及其反混淆方法。通过示例程序`hello.c`生成被CFF混淆的二进制文件`hello_fla`,并利用`angr`框架和自定义脚本`hello_fla_patch.py`恢复其控制流结构。此外,介绍了IDAPython插件D810的应用,展示了如何借助规则集进行反混淆处理。最终实现了对混淆代码的逆向分析。 2025-8-11 10:45:46 Author: blog.nsfocus.net(查看原文) 阅读量:13 收藏

阅读: 4

创建: 2025-07-31 12:22
更新:

————————————————————————–

目录:

☆ 背景介绍
☆ hello.c
☆ hello_fla
☆ hello_fla_patch.py
☆ IDAPython插件D810
1) 安装D810插件
2) 使用D810插件
3) 借助AI理解D810框架结构
☆ 后记

————————————————————————–

☆ 背景介绍

参看

————————————————————————–
Control Flow Flattening (CFF)
https://github.com/obfuscator-llvm/obfuscator/wiki/Control-Flow-Flattening

Deobfuscation: recovering an OLLVM-protected program – Francis Gabriel [2014-12-04]
https://blog.quarkslab.com/deobfuscation-recovering-an-ollvm-protected-program.html
————————————————————————–

控制流平坦化将正常控制流转换成状态变量驱动,夹杂状态变量混淆,使得IDA F5结
果非人类可读,但不影响原有代码逻辑,顶多有些性能损耗。CFF目的是对抗静态分
析。

本文以学习angr进阶用法为目的,借CFF反混淆为靶标。

$ pip3 show angr | grep Version
Version: 9.2.125.dev0

☆ hello.c

————————————————————————–
#include <stdio.h>
#include <stdlib.h>

static unsigned int foo ( unsigned int n )
{
unsigned int mod = n % 4;
unsigned int ret = 0;

if ( mod == 0 )
{
ret = ( n | 0xbaaad0bf ) * ( 2 ^ n );
}
else if ( mod == 1 )
{
ret = ( n & 0xbaaad0bf ) * ( 3 + n );
}
else if ( mod == 2 )
{
ret = ( n ^ 0xbaaad0bf ) * ( 4 | n );
}
else
{
ret = ( n + 0xbaaad0bf ) * ( 5 & n );
}
return ret;
}

int main ( int argc, char * argv[] )
{
unsigned int n,
ret;

if ( argc < 2 )
{
fprintf( stderr, “Usage: %s <num>\n”, argv[0] );
return -1;
}
n = (unsigned int)strtoul( argv[1], NULL, 0 );
ret = foo( n );
fprintf( stdout, “n=%#x ret=%#x\n”, n, ret );
return 0;
}
————————————————————————–

clang -pipe -O0 -s -mllvm -passes=fla -o hello_fla hello.c

用某版OLLVM启用fla编译,得到hello_fla。

完整测试用例打包

https://scz.617.cn/unix/202507311222.txt
https://scz.617.cn/unix/202507311222.7z

☆ hello_fla

$ file -b hello_fla
ELF 64-bit LSB executable, x86-64, …, stripped

IDA64反汇编hello_fla

————————————————————————–
__int64 __fastcall main(int a1, char **a2, char **a3)
{
int v3;
int v5;
unsigned int v6;
unsigned int v7;
unsigned int v8;

v8 = 0;
v5 = 0xF2AB2D56;
while ( 1 )
{
while ( v5 == 0xA6509F46 )
{
fprintf(stderr, “Usage: %s <num>\n”, *a2);
v8 = -1;
v5 = 0xE926118E;
}
if ( v5 == 0xE926118E )
break;
if ( v5 == 0xF2AB2D56 )
{
v3 = 0x64B7B86A;
if ( a1 < 2 )
v3 = 0xA6509F46;
v5 = v3;
}
else
{
v7 = strtoul(a2[1], 0LL, 0);
v6 = sub_401270(v7);
fprintf(stdout, “n=%#x ret=%#x\n”, v7, v6);
v8 = 0;
v5 = 0xE926118E;
}
}
return v8;
}

__int64 __fastcall sub_401270(int a1)
{
int v1;
int v2;
int v3;
int v5;
unsigned int v6;
int v7;

v7 = a1 & 3;
v6 = 0;
v5 = 0x1D861884;
while ( v5 != 0x95AD57E0 )
{
switch ( v5 )
{
case 0xBBF4F2F8:
v6 = (a1 + 3) * (a1 & 0xBAAAD0BF);
v5 = 0xCE9DE31;
break;
case 0xC915711E:
v3 = 0x54BE0661;
if ( v7 == 2 )
v3 = 0xD6927C4E;
v5 = v3;
break;
case 0xD6927C4E:
v6 = (a1 | 4) * (a1 ^ 0xBAAAD0BF);
v5 = 0x5B1C3258;
break;
case 0xCE9DE31:
v5 = 0x95AD57E0;
break;
case 0x134D92DC:
v6 = (a1 ^ 2) * (a1 | 0xBAAAD0BF);
v5 = 0x95AD57E0;
break;
case 0x1D861884:
v1 = 0x1F74CBC8;
if ( (a1 & 3) == 0 )
v1 = 0x134D92DC;
v5 = v1;
break;
case 0x1F74CBC8:
v2 = 0xC915711E;
if ( v7 == 1 )
v2 = 0xBBF4F2F8;
v5 = v2;
break;
case 0x54BE0661:
v6 = (a1 & 5) * (a1 – 0x45552F41);
v5 = 0x5B1C3258;
break;
default:
v5 = 0xCE9DE31;
break;
}
}
return v6;
}
————————————————————————–

F5的伪代码没必要深究,看个大概即可。

☆ hello_fla_patch.py

这是对付hello_fla的完整代码,演示性质,非通用实现。

hello_fla_patch.py实际源自

https://github.com/cq674350529/deflat/blob/master/flat_control_flow/deflat.py

am_graph模块实际源自

https://github.com/angr/angr-management/blob/master/angrmanagement/utils/graph.py

————————————————————————–
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

import sys, collections
import angr, claripy, pyvex
import am_graph

def get_func_from_addr ( proj, addr ) :
try :
return proj.kb.functions.get_by_addr( addr )
except KeyError :
return proj.kb.functions.floor_func( addr )

#
# threshold是所有”有效块”中最小字节长度
#
def get_relevant_nop_nodes ( supergraph, pre_dispatcher_node, prologue_node, retn_node, threshold ) :
#
# relevant_nodes = list( supergraph.predecessors( pre_dispatcher_node ) )
#
relevant_nodes = []
nop_nodes = []
for node in supergraph.nodes() :
if node.addr in ( prologue_node.addr, retn_node.addr, pre_dispatcher_node.addr ) :
continue
#
# 靠threshold快速过滤、排除”非有效块”
#
if supergraph.has_edge( node, pre_dispatcher_node ) and node.size > threshold :
relevant_nodes.append( node )
else :
nop_nodes.append( node )
return relevant_nodes, nop_nodes

#
# 通过符号执行寻找下一跳
#
# 若CFF的状态转换变量在某个case中不是简单赋值,而是与当前case值进行计算所
# 得,下面的符号执行方案可能有问题,涉及如何初始化的问题。
#
def symbolic_execution ( proj, keep_blocks, start_addr, hook_addrs, set_value=None ) :

def retn_procedure ( state ) :
#
# ip = state.solver.eval( state.regs.ip )
# proj.unhook( ip )
#
proj.unhook( state.addr )
return

def statement_inspect ( state ) :
#
# state.scratch.irsb是正在处理的IR SuperBlock (IRSB)的VEX IR表示。
# 一个IRSB包含一系列IR语句。
#
# 当’statement’类型的检查点触发时,在state.inspect.statement中存储
# 当前正在处理的IR语句的索引。
#
# state.scratch.irsb.statements[]是个数组
# state.inspect.statement是int,是个索引
#
# 每个IR语句可能包含一个或多个表达式(expressions)
#
expressions = list( state.scratch.irsb.statements[state.inspect.statement].expressions )
#
# if…then…else
#
if len( expressions ) != 0 and isinstance( expressions[0], pyvex.expr.ITE ) :
#
# state.scratch.temps[]用于存储VEX IR临时变量值
#
# ITE表达式的cond属性代表条件表达式本身。下面这个值决定ITE表达
# 式走then分支还是else分支。
#
state.scratch.temps[expressions[0].cond.tmp] = set_value
state.inspect._breakpoints[‘statement’] = []

if hook_addrs :
for addr in hook_addrs :
#
# 假设call指令占5字节
#
proj.hook( addr, retn_procedure, length=5 )

init_state = proj.factory.blank_state(
addr = start_addr,
add_options = {
angr.options.SYMBOL_FILL_UNCONSTRAINED_MEMORY,
angr.options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS,
angr.options.BYPASS_UNSUPPORTED_SYSCALL,
},
remove_options = {
angr.options.LAZY_SOLVES,
}
)
if set_value is not None :
init_state.inspect.b( ‘statement’, when=angr.BP_BEFORE, action=statement_inspect )

sm = proj.factory.simulation_manager( init_state )
sm.step()
while len( sm.active ) > 0 :
for state in sm.active :
if state.addr in keep_blocks :
return state.addr
sm.step()

return None

#
# 恢复控制流
#
def get_flow ( proj, prologue_node, relevant_nodes, retn_node ) :

#
# 本例实测下来,target不含序言块也可以,但稳妥起见,还是含序言块
#
symbolic_execution_target \
= [prologue_node]
symbolic_execution_target.extend( relevant_nodes )

keep_blocks = [node.addr for node in relevant_nodes]
#
# keep_blocks包含返回块,不包含主分发器、预处理器
#
keep_blocks.extend( [retn_node.addr,] )
print( f’keep_blocks[{len(keep_blocks)}]:’ )
for i, addr in enumerate( keep_blocks ) :
print( f'[{i}] {addr:#x}’ )
#
# 从list转成set,提高后面的检查效率,实测并不明显
#
keep_blocks = set( keep_blocks )

flow = collections.defaultdict( list )
ins_dict = {}
for node in symbolic_execution_target :
block = proj.factory.block( node.addr, size=node.size )
has_branch = False
hook_addrs = set()
for ins in block.capstone.insns :
if ins.mnemonic.startswith( ‘cmov’ ) :
#
# only record the first one
#
if node not in ins_dict :
ins_dict[node] = ins
#
# 发现comv*系列
#
has_branch = True
elif ins.mnemonic.startswith( ‘call’ ) :
hook_addrs.add( ins.address )

if has_branch :
next_addr = symbolic_execution(
proj,
keep_blocks,
node.addr,
hook_addrs,
claripy.BVV( 1, 1 )
)
if next_addr is not None :
flow[node].append( next_addr )
next_addr = symbolic_execution(
proj,
keep_blocks,
node.addr,
hook_addrs,
claripy.BVV( 0, 1 )
)
if next_addr is not None :
flow[node].append( next_addr )
else :
next_addr = symbolic_execution(
proj,
keep_blocks,
node.addr,
hook_addrs
)
if next_addr is not None :
flow[node].append( next_addr )
return ( flow, ins_dict, )

OPCODES = {
‘a’ : b’\x87′,
‘ae’ : b’\x83′,
‘b’ : b’\x82′,
‘be’ : b’\x86′,
‘c’ : b’\x82′,
‘e’ : b’\x84′,
‘z’ : b’\x84′,
‘g’ : b’\x8f’,
‘ge’ : b’\x8d’,
‘l’ : b’\x8c’,
‘le’ : b’\x8e’,
‘na’ : b’\x86′,
‘nae’ : b’\x82′,
‘nb’ : b’\x83′,
‘nbe’ : b’\x87′,
‘nc’ : b’\x83′,
‘ne’ : b’\x85′,
‘ng’ : b’\x8e’,
‘nge’ : b’\x8c’,
‘nl’ : b’\x8d’,
‘nle’ : b’\x8f’,
‘no’ : b’\x81′,
‘np’ : b’\x8b’,
‘ns’ : b’\x89′,
‘nz’ : b’\x85′,
‘o’ : b’\x80′,
‘p’ : b’\x8a’,
‘pe’ : b’\x8a’,
‘po’ : b’\x8b’,
‘s’ : b’\x88′,
‘jmp’ : b’\xe9′,
‘j’ : b’\x0f’,
‘nop’ : b’\x90′,
}

def fill_nop ( proj, buf, addr, size ) :
off = proj.loader.main_object.addr_to_offset( addr )
buf[off:off+size] \
= OPCODES[‘nop’] * size

def get_j_ins ( f_addr, t_addr, j_type ) :
if ‘jmp’ == j_type :
j_opcode = OPCODES[‘jmp’]
j_size = 5
else :
j_opcode = OPCODES[‘j’] + OPCODES[j_type]
j_size = 6
j_off = t_addr – f_addr – j_size
#
# struct.pack( ‘<i’, j_off )
#
j_ins = j_opcode + j_off.to_bytes( 4, byteorder=’little’, signed=True )
return j_ins

def patch_ins ( proj, buf, addr, ins ) :
off = proj.loader.main_object.addr_to_offset( addr )
size = len( ins )
buf[off:off+size] \
= ins

def patch_buf ( proj, buf, nop_nodes, flow, ins_dict ) :
for node in nop_nodes :
fill_nop( proj, buf, node.addr, node.size )

for parent, children in flow.items() :
if 1 == len( children ) :
block = proj.factory.block( parent.addr, size=parent.size )
#
# 块的最后一条指令
#
ins = block.capstone.insns[-1]
#
# 若symbolic_execution_target含序言块,此assert不成立
#
# assert ‘jmp’ == ins.mnemonic

#
# patch the last instruction to jmp
#
j_ins = get_j_ins( ins.address, children[0], ‘jmp’ )
assert ins.size >= len( j_ins )
#
# 此处NOP化并非必要,保守起见,建议NOP化
#
fill_nop( proj, buf, ins.address, ins.size )
patch_ins( proj, buf, ins.address, j_ins )
else :
ins = ins_dict[parent]
#
# patch instructions starting from cmovx to the end of block
#
fill_nop( proj, buf, ins.address, parent.addr + parent.size – ins.address )
#
# patch the cmovx to jx
#
j_ins = get_j_ins( ins.address, children[0], ins.mnemonic[len(‘cmov’):] )
patch_ins( proj, buf, ins.address, j_ins )
#
# patch the next instruction to jmp instrcution
#
j_ins = get_j_ins( ins.address+6, children[1], ‘jmp’ )
patch_ins( proj, buf, ins.address+6, j_ins )

def dosth ( proj, buf, addr ) :

print( f’func {addr:#x}’ )

func = get_func_from_addr( proj, addr )
#
# A super transition graph is a graph that looks like IDA CFG, where
# calls to returning functions do not terminate basic blocks.
#
supergraph = am_graph.to_supergraph( func.transition_graph )

#
# get prologue_node and retn_node
#
prologue_node = None
retn_node = None
for node in supergraph.nodes() :
if 0 == supergraph.in_degree( node ) :
prologue_node = node
if 0 == supergraph.out_degree( node ) and len( node.out_branches ) == 0 :
retn_node = node

assert prologue_node is not None
assert retn_node is not None
print( ‘prologue_node: %#x’ % prologue_node.addr )
print( ‘retn_node: %#x’ % retn_node.addr )

#
# 序言的后继为主分发器
#
main_dispatcher_node \
= list( supergraph.successors( prologue_node ) )[0]
print( ‘main_dispatcher_node: %#x’ % main_dispatcher_node.addr )

pre_dispatcher_node \
= None
#
# 后继为主分发器的非序言块为预处理器
#
for node in supergraph.predecessors( main_dispatcher_node ) :
if node.addr != prologue_node.addr :
pre_dispatcher_node = node
break

assert pre_dispatcher_node is not None
print( ‘pre_dispatcher_node: %#x’ % pre_dispatcher_node.addr )

relevant_nodes, nop_nodes \
= get_relevant_nop_nodes( supergraph, pre_dispatcher_node, prologue_node, retn_node, 12 )

flow, ins_dict = get_flow( proj, prologue_node, relevant_nodes, retn_node )
print( f’flow[{len(flow)}]:’ )
for i, ( k, v ) in enumerate( flow.items() ) :
print( ‘[%d] %#x ->’ % ( i, k.addr ), [hex(child) for child in v] )

#
# 修改buf
#
patch_buf( proj, buf, nop_nodes, flow, ins_dict )
print( ” )

def main ( argv ) :
base_addr = 0x400000
#
# proj.arch.name == ‘AMD64’
#
proj = angr.Project(
argv[1],
load_options = {
‘auto_load_libs’ : False,
‘main_opts’ : {
‘base_addr’ : base_addr
}
}
)
cfg = proj.analyses.CFG(
force_smart_scan = False,
force_complete_scan = True,
normalize = True,
resolve_indirect_jumps \
= True,
fail_fast = True
)

with open( argv[1], ‘rb’ ) as f :
buf = bytearray( f.read() )
origsize = len( buf )

#
# 对应若干需要Patch的函数
#
addrlist = ( 0x401140, 0x401270, )
for addr in addrlist :
dosth( proj, buf, addr )

#
# 防止误增加buf长度
#
assert len( buf ) == origsize
with open( argv[2], ‘wb’ ) as f :
f.write( buf )

if “__main__” == __name__ :
main( sys.argv )
————————————————————————–

python3 hello_fla_patch.py hello_fla hello_fla_new

IDA64反汇编hello_fla_new_*,F5查看main、sub_401270,已能看出hello.c所展示
的代码逻辑。

☆ IDAPython插件D810

参看

————————————————————————–
D810: Creating an extensible deobfuscation plugin for IDA Pro
https://eshard.com/posts/d810-deobfuscation-ida-pro

D810: A journey into control flow unflattening – Boris Batteaux
https://eshard.com/posts/D810-a-journey-into-control-flow-unflattening

https://gitlab.com/eshard/d810
————————————————————————–

D810是IDAPython插件,利用Hex-Rays微码技术反控制流平坦化。

1) 安装D810插件

安装过程就是复制D810.py及d810子目录到

X:\Green\IDA\plugins\

需在IDAPython环境中安装z3-solver模块

2) 使用D810插件

呼出D810的GUI

————————————————————————–
Edit->Plugins->D-810 (Ctrl+Shift+D)
Current file loaded
X:\Green\IDA\plugins\d810\conf\default_unflattening_ollvm.json
Start
红色的”Not loaded”变成绿色的”Loaded”
————————————————————————–

对付hello_fla时,用default_unflattening_ollvm.json即可;一般要根据目标二进
制选相应的反CFF规则,不熟的就挨个试。选完规则,点击Start,启用规则,默认是
Stop状态。最后F5反编译目标函数。

使用D810插件默认会生成日志,保存在

X:\Green\IDA\plugins\d810_logs\

假设增加自定义规则,需编辑两个文件

X:\Green\IDA\plugins\d810\conf\

options.json // 配置中增加”Test_Analysis.json”的条目
Test_Analysis.json // 自定义规则

3) 借助AI理解D810框架结构

参看

《让AI协助阅读代码》
https://scz.617.cn/misc/202503101024.txt

访问

https://gitseek.dev/zh

向它提交D810项目源码所在网址

https://gitlab.com/eshard/d810

得到合并后的D810项目源码

eshard-d810-prompt.txt

将上述文件上传至aistudio。依次提问:

————————————————————————–
这是D810的项目,我需要学习该项目。请用中文交流,解释该项目框架结构。

D810总是自动生成d810_logs,这个行为怎么控制?

D810是不是有个什么配置,可以帮你分析样本中用到哪些pattern?

详细解释unflattener.py
————————————————————————–

我对D810作者姓名不敏感,据bluerust说,“看了一眼D810,大为震撼,赶紧搜了搜
作者,果然是巨佬”。

☆ 后记

控制流平坦化与反控制流平坦化是场猫鼠游戏,本文只是入门级原理演示。现实世界
中有许多魔改过的CFF实现,源自中国大陆地区各大互联网厂商及黑灰产从业人员。

建议学习D810源码,了解Hex-Rays微码技术,针对魔改CFF实现,编写自定义规则对
抗之。


文章来源: https://blog.nsfocus.net/angr%e7%ac%a6%e5%8f%b7%e6%89%a7%e8%a1%8c%e7%bb%83%e4%b9%a0-%e5%af%b9%e4%bb%98ollvm-control-flow-flattening-%e6%8e%a7%e5%88%b6%e6%b5%81%e5%b9%b3%e5%9d%a6%e5%8c%96/
如有侵权请联系:admin#unsafe.sh