Straw's B1og.

反混淆学习-fla控制流平坦化篇

字数统计: 2k阅读时长: 10 min
2024/03/27

反混淆学习-fla控制流平坦化篇

感觉反混淆比混淆难多了。。。

现在也暂时不会怎么自己写ida-python脚本来反混淆

还是先学学看吧

deflat工具介绍

网上常用的反控制流平坦化的工具,可以直接在终端运行将经过fla编译的可执行文件转化为反混淆过的可执行文件,也支持arm架构。

下载地址:deflat: use angr to deobfuscation

1
$ python deflat.py --file 文件名 --addr 初始地址

需要一个python的angr库

因为要用到初始地址,所以就要求你的可执行文件没开PIE(随机地址)保护

(要求还挺苛刻的,估计只有比赛值得一用,实战估计全是保护全开,优点就是常用架构都能用)

流程

自己先用编译一个保护全关的经过fla过的文件

1
2
3
opt -lowerswitch -S IR/TestProgram.ll -o IR/TestProgram_lowerswitch.ll
opt -load ../Build/LLVMObfuscator.so -fla -S -enable-new-pm=0 IR/TestProgram_lowerswitch.ll -o IR/TestProgram_fla.ll
clang IR/TestProgram_fla.ll -o Bin/TestProgram_fla -fno-stack-protector -z execstack -no-pie

image-20240327201723919

用ida看看,很常规的控制流平坦化

image-20240327201828945

用工具转化一下,得到了个TestProgram_fla_unprotect_recovered文件

image-20240327202021043

确实反混淆了,效果不错

image-20240327202141816

这下面有一堆nop看来是把一些跳转指令全去了,并进行了补全

代码分析

详细学习太复杂,简单概述一下大概流程

主函数

引入:

  1. 函数的开始地址为序言的地址
  2. 序言的后继为主分发器
  3. 后继为主分发器的块为预处理器
  4. 后继为预处理器的块为真实块
  5. 无后继的块为返回块
  6. 剩下的为无用块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def main():
parser = argparse.ArgumentParser(description="deflat control flow script")
parser.add_argument("-f", "--file", help="binary to analyze")
parser.add_argument(
"--addr", help="address of target function in hex format")
args = parser.parse_args()

if args.file is None or args.addr is None:
parser.print_help()
sys.exit(0)

filename = args.file
start = int(args.addr, 16)

project = angr.Project(filename, load_options={'auto_load_libs': False})
# do normalize to avoid overlapping blocks, disable force_complete_scan to avoid possible "wrong" blocks
# 获取控制流
cfg = project.analyses.CFGFast(normalize=True, force_complete_scan=False)
target_function = cfg.functions.get(start)
# A super transition graph is a graph that looks like IDA Pro's CFG
supergraph = am_graph.to_supergraph(target_function.transition_graph)

base_addr = project.loader.main_object.mapped_base >> 12 << 12

# get prologue_node and retn_node
# 从控制流中获取返回块和序言
prologue_node = None
for node in supergraph.nodes():
if supergraph.in_degree(node) == 0:
prologue_node = node
if supergraph.out_degree(node) == 0 and len(node.out_branches) == 0:
retn_node = node

if prologue_node is None or prologue_node.addr != start:
print("Something must be wrong...")
sys.exit(-1)

# 主分发器,successors包括了所有正常的后续list
main_dispatcher_node = list(supergraph.successors(prologue_node))[0]

# 找到预处理器
for node in supergraph.predecessors(main_dispatcher_node):
if node.addr != prologue_node.addr:
pre_dispatcher_node = node
break
# 找到真实块和预处理块(虚假块)
relevant_nodes, nop_nodes = get_relevant_nop_nodes(
supergraph, pre_dispatcher_node, prologue_node, retn_node)
print('*******************relevant blocks************************')
print('prologue: %#x' % start)
print('main_dispatcher: %#x' % main_dispatcher_node.addr)
print('pre_dispatcher: %#x' % pre_dispatcher_node.addr)
print('retn: %#x' % retn_node.addr)
relevant_block_addrs = [node.addr for node in relevant_nodes]
print('relevant_blocks:', [hex(addr) for addr in relevant_block_addrs])
# 打印相关块信息

print('*******************symbolic execution*********************')
relevants = relevant_nodes
relevants.append(prologue_node)
relevants_without_retn = list(relevants)
relevants.append(retn_node)
relevant_block_addrs.extend([prologue_node.addr, retn_node.addr])

flow = defaultdict(list)
patch_instrs = {}
for relevant in relevants_without_retn:
print('-------------------dse %#x---------------------' % relevant.addr)
block = project.factory.block(relevant.addr, size=relevant.size)
has_branches = False
hook_addrs = set([])
for ins in block.capstone.insns:
# 不同架构的区分
if project.arch.name in ARCH_X86:
# 判断该真实块是否存在分支
if ins.insn.mnemonic.startswith('cmov'):
# only record the first one
if relevant not in patch_instrs:
patch_instrs[relevant] = ins
has_branches = True
# call指令对angr没影响,直接hook
elif ins.insn.mnemonic.startswith('call'):
hook_addrs.add(ins.insn.address)
elif project.arch.name in ARCH_ARM:
if ins.insn.mnemonic != 'mov' and ins.insn.mnemonic.startswith('mov'):
if relevant not in patch_instrs:
patch_instrs[relevant] = ins
has_branches = True
elif ins.insn.mnemonic in {'bl', 'blx'}:
hook_addrs.add(ins.insn.address)
elif project.arch.name in ARCH_ARM64:
if ins.insn.mnemonic.startswith('cset'):
if relevant not in patch_instrs:
patch_instrs[relevant] = ins
has_branches = True
elif ins.insn.mnemonic in {'bl', 'blr'}:
hook_addrs.add(ins.insn.address)

# angr符号执行,对于两个分支的进行模拟,获取后续节点
if has_branches:
tmp_addr = symbolic_execution(project, relevant_block_addrs,
relevant.addr, hook_addrs, claripy.BVV(1, 1), True)
if tmp_addr is not None:
flow[relevant].append(tmp_addr)
tmp_addr = symbolic_execution(project, relevant_block_addrs,
relevant.addr, hook_addrs, claripy.BVV(0, 1), True)
if tmp_addr is not None:
flow[relevant].append(tmp_addr)
else:
tmp_addr = symbolic_execution(project, relevant_block_addrs,
relevant.addr, hook_addrs)
if tmp_addr is not None:
flow[relevant].append(tmp_addr)

print('************************flow******************************')
for k, v in flow.items():
print('%#x: ' % k.addr, [hex(child) for child in v])

print('%#x: ' % retn_node.addr, [])

print('************************patch*****************************')
with open(filename, 'rb') as origin:
# Attention: can't transform to str by calling decode() directly. so use bytearray instead.
origin_data = bytearray(origin.read())
origin_data_len = len(origin_data)
# 打开文件,记录长度

recovery_file = filename + '_recovered'
recovery = open(recovery_file, 'wb')
# 重命名为_recovered文件并重新打开

# patch irrelevant blocks
for nop_node in nop_nodes:
fill_nop(origin_data, nop_node.addr-base_addr,
nop_node.size, project.arch)
# 如果在nop列表中全nop了

# remove unnecessary control flows
for parent, childs in flow.items():
if len(childs) == 1:
# 子节点为1的操作
parent_block = project.factory.block(parent.addr, size=parent.size)
last_instr = parent_block.capstone.insns[-1]
file_offset = last_instr.address - base_addr
# patch the last instruction to jmp
if project.arch.name in ARCH_X86:
fill_nop(origin_data, file_offset,
last_instr.size, project.arch)
patch_value = ins_j_jmp_hex_x86(last_instr.address, childs[0], 'jmp')
elif project.arch.name in ARCH_ARM:
patch_value = ins_b_jmp_hex_arm(last_instr.address, childs[0], 'b')
if project.arch.memory_endness == "Iend_BE":
patch_value = patch_value[::-1]
elif project.arch.name in ARCH_ARM64:
# FIXME: For aarch64/arm64, the last instruction of prologue seems useful in some cases, so patch the next instruction instead.
if parent.addr == start:
file_offset += 4
patch_value = ins_b_jmp_hex_arm64(last_instr.address+4, childs[0], 'b')
else:
patch_value = ins_b_jmp_hex_arm64(last_instr.address, childs[0], 'b')
if project.arch.memory_endness == "Iend_BE":
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)
# 针对不同架构将父节点的最后一条指令修补为跳转至子节点的跳转指令
else:
# 子节点多个的操作
instr = patch_instrs[parent]
file_offset = instr.address - base_addr
# patch instructions starting from `cmovx` to the end of block
fill_nop(origin_data, file_offset, parent.addr +
parent.size - base_addr - file_offset, project.arch)
if project.arch.name in ARCH_X86:
# patch the cmovx instruction to jx instruction
patch_value = ins_j_jmp_hex_x86(instr.address, childs[0], instr.mnemonic[len('cmov'):])
patch_instruction(origin_data, file_offset, patch_value)

file_offset += 6
# patch the next instruction to jmp instrcution
patch_value = ins_j_jmp_hex_x86(instr.address+6, childs[1], 'jmp')
patch_instruction(origin_data, file_offset, patch_value)
elif project.arch.name in ARCH_ARM:
# patch the movx instruction to bx instruction
bx_cond = 'b' + instr.mnemonic[len('mov'):]
patch_value = ins_b_jmp_hex_arm(instr.address, childs[0], bx_cond)
if project.arch.memory_endness == 'Iend_BE':
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)

file_offset += 4
# patch the next instruction to b instrcution
patch_value = ins_b_jmp_hex_arm(instr.address+4, childs[1], 'b')
if project.arch.memory_endness == 'Iend_BE':
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)
elif project.arch.name in ARCH_ARM64:
# patch the cset.xx instruction to bx instruction
bx_cond = instr.op_str.split(',')[-1].strip()
patch_value = ins_b_jmp_hex_arm64(instr.address, childs[0], bx_cond)
if project.arch.memory_endness == 'Iend_BE':
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)

file_offset += 4
# patch the next instruction to b instruction
patch_value = ins_b_jmp_hex_arm64(instr.address+4, childs[1], 'b')
if project.arch.memory_endness == 'Iend_BE':
patch_value = patch_value[::-1]
patch_instruction(origin_data, file_offset, patch_value)
#如果存在多个子节点,则需要进行更复杂的修补。对于x86架构,需要修改条件传送指令(如cmovx)为条件跳转指令(如jx)。对于ARM和ARM64架构,需要修改指令为条件跳转指令(如bx、b)

assert len(origin_data) == origin_data_len, "Error: size of data changed!!!"
#确保修复前后长度相同
recovery.write(origin_data)
recovery.close()
print('Successful! The recovered file: %s' % recovery_file)

获取与控制流相关的NOP节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def get_relevant_nop_nodes(supergraph, pre_dispatcher_node, prologue_node, retn_node):
# relevant_nodes = list(supergraph.predecessors(pre_dispatcher_node))
relevant_nodes = []
nop_nodes = []
for node in supergraph.nodes():
#使用块字节大小,来判断是否为真实块
if supergraph.has_edge(node, pre_dispatcher_node) and node.size > 8:
# XXX: use node.size is faster than to create a block
relevant_nodes.append(node)
continue
#排除序言,返回块,预处理器
if node.addr in (prologue_node.addr, retn_node.addr, pre_dispatcher_node.addr):
continue
nop_nodes.append(node)
#其余都加入nop
return relevant_nodes, nop_nodes

在给定的起始地址处执行符号执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def symbolic_execution(project, relevant_block_addrs, start_addr, hook_addrs=None, modify_value=None, inspect=False):

#从状态中获取当前指令指针的值,并解除指定地址的挂钩
def retn_procedure(state):
ip = state.solver.eval(state.regs.ip)
project.unhook(ip)
return
# 检查语句
def statement_inspect(state):
expressions = list(
state.scratch.irsb.statements[state.inspect.statement].expressions)
if len(expressions) != 0 and isinstance(expressions[0], pyvex.expr.ITE):
state.scratch.temps[expressions[0].cond.tmp] = modify_value
state.inspect._breakpoints['statement'] = []
# 检查挂钩
if hook_addrs is not None:
skip_length = 4
if project.arch.name in ARCH_X86:
skip_length = 5

for hook_addr in hook_addrs:
project.hook(hook_addr, retn_procedure, length=skip_length)

state = project.factory.blank_state(addr=start_addr, remove_options={
angr.sim_options.LAZY_SOLVES})
if inspect:
state.inspect.b(
'statement', when=angr.state_plugins.inspect.BP_BEFORE, action=statement_inspect)
sm = project.factory.simulation_manager(state)
sm.step()
while len(sm.active) > 0:
for active_state in sm.active:
if active_state.addr in relevant_block_addrs:
return active_state.addr
sm.step()

return None
CATALOG
  1. 1. 反混淆学习-fla控制流平坦化篇
    1. 1.1. deflat工具介绍
    2. 1.2. 流程
    3. 1.3. 代码分析
      1. 1.3.1. 主函数
      2. 1.3.2. 获取与控制流相关的NOP节点
      3. 1.3.3. 在给定的起始地址处执行符号执行