0%

使用frida捕获动态模块中解码的二进制socket报文

安装依赖

1
2
3
pip install frida
pip install frida-tools
pip install umsgpack

frida脚本

目标是在 AsyncNet.win32 (其实就是个dll模块)中的 asn_core_read 调用返回之后,读取其写入缓冲区的内容,解析然后输出来

asn_core_read 的函数签名如下:

1
2
3
4
5
6
7
8
int asn_core_read(
void* asn_core_obj_ptr,
int* event_ptr,
long* wparam_ptr,
long* lparam_ptr,
uint8_t* buffer,
int buffer_size
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# hook_asn_core_read.py

import os
import sys
import json
import zlib
import struct
import frida
import umsgpack
import logging


frida_session = None

def on_receive_im_message(message, data):
def _event_code_desc(code):
return {
0: 'ASYNC_EVT_NEW',
1: 'ASYNC_EVT_LEAVE',
2: 'ASYNC_EVT_ESTAB',
3: 'ASYNC_EVT_DATA',
4: 'ASYNC_EVT_PROGRESS',
5: 'ASYNC_EVT_PUSH',
}.get(code, str(code))

if 'payload' not in message:
return

payload = json.loads(message['payload'])

if payload['type'] == 'exit':
# if frida_session:
# frida_session.detach()
# os._exit(1)
pass
elif payload['type'] == 'msg':
buffer = bytes(payload["buffer"])

appid, cmd, eid, uid, flag, version = struct.unpack("<HHIIII", buffer[:20])
plaintext = buffer[20:]

if flag & 0x10: # FLAG_COMPRESS
plaintext = zlib.decompress(plaintext)

plaintext = umsgpack.unpackb(plaintext)

logging.info(f'event : {_event_code_desc(payload["event"])}, hid : {payload["hid"]}, tag : {payload["tag"]}')
logging.info(f'appid : {appid}, cmd : {cmd}, eid : {eid}, uid : {uid}, flag : {flag}, version : {version}')
logging.info(f'{plaintext}')

print('')

def main(target_process):
global frida_session

frida_session = frida.attach(target_process)
script = frida_session.create_script("""
const async_net_base_addr = Module.findBaseAddress('AsyncNet.win32');
if (!async_net_base_addr) {
console.error("AsyncNet.win32 module it's not loaded");
send(JSON.stringify({type: "exit"}));
Script.stop();
}

const asn_core_read_addr = Module.findExportByName('AsyncNet.win32', 'asn_core_read');
if (!asn_core_read_addr) {
console.error("asn_core_read_addr not found");
send(JSON.stringify({type: "exit"}));
Script.stop();
}

console.log('AsyncNet.win32 base address : ' + async_net_base_addr);
console.log('asn_core_read addr : ' + asn_core_read_addr);

function im_received(event, hid, tag, addr, size) {
if (addr.isNull()) {
return;
}

send(
JSON.stringify({
'type': 'msg',
"buffer": Array.from(new Uint8Array(addr.readByteArray(size))),
"event": event,
"hid": hid,
"tag": tag
})
);
}

Interceptor.attach(asn_core_read_addr, {
// args[0] : obj ptr
// args[1] : event ptr
// args[2] : wparam ptr
// args[3] : lparam ptr
// args[4] : buffer ptr
// args[5] : buffer size
onEnter(args) {
this.event_ptr = args[1];
this.wparam_ptr = args[2];
this.lparam_ptr = args[3];
this.buffer_ptr = args[4];
},
onLeave(retval) {
if (retval.toInt32() > 0) {
im_received(
this.event_ptr.readS32(),
this.wparam_ptr.readS32(),
this.lparam_ptr.readS32(),
this.buffer_ptr,
parseInt(retval, 16)
);
}
}
});

""")
script.on('message', on_receive_im_message)
script.load()

print("[!] Ctrl+Z and enter to detach and exit.\n\n")
sys.stdin.read()

frida_session.detach()

if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: %s <process name or PID>" % __file__)
sys.exit(1)

try:
target_process = int(sys.argv[1])
except ValueError:
target_process = sys.argv[1]

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

main(target_process)
1
2
3
# 启动方法
python hook_asn_core_read.py <lieyou.exe 或 pid>
# 退出按ctrl + z后回车
请我喝瓶肥仔快乐水?

欢迎关注我的其它发布渠道