整个APP数据加解密可以分为:
1.前期的代码分析、hook点找寻(有的可能需要Frida反调绕过、抓包绕过等)、针对加解密hook函数编写Frida脚本将加解密函数导出。
2.后期的burp.py脚本(bp插件)编写、server.py编写。根据从burp.py中得到的数据以及Frida脚本hook的加解密方法,编写相关逻辑代码,编写数据测试单元。
3.测试成功之后将得到解密明文、修改后的加密密文flask/ xmlrpc的方式返回给burp.py插件。burp插件将数据展示在对应的tab标签。
👀加解密函数寻找
有时根据加密报文特征,例如报文可能为个键值对,可以根据密文的键名在编译后的程序里去寻找,进而进一步交叉引用找到相关加解密函数。找到加解密函数之后使用Frida对其进行hook,通过向其传入明文(加密)获取到请求包加密结果。,或向其传入密文(解密)获取响应包解密结果。如果是非对称加密,为了可以实现对请求包的解密需要拦截请求包加密函数,通过输出日志的形式将明文参数和密文结果传到server.py存储到数据库建立映射关系。当需要对请求包解密时从数据库根据密文查询明文即可。
有时候虽然找到了加解密函数,但是他们的参数(即要加解密的报文)不是我们抓获的格式,需要进行一些处理,这时候就需要在Frida的脚本中也加入类似这种格式处理的方法。可能要使用某个类的某个方法。这个时候就需要主动调用该方法,然后尽可能地将其写成简明的函数形式,然后直接在Frida脚本中进行调用。
1、获取请求包,对请求包数据进行解密,展示在请求包的插件标签。
2、对请求包插件标签的内容进行修改,对修改后的数据进行加密后,展示在请求包原始标签内。
from burp import IBurpExtender
from burp import IMessageEditorTab
from burp import IMessageEditorTabFactory
from java.io import PrintWriter
import json, time, base64, httplib
import re
import hashlib
import sys
# config_dict
config_dict = {
"pkgname" : "test",
"_api" : "127.0.0.1:31337",
}
class BurpExtender(IBurpExtender,IMessageEditorTabFactory):
def registerExtenderCallbacks(self, callbacks):
self.stdout = PrintWriter(callbacks.getStdout(), True)
self.stderr = PrintWriter(callbacks.getStderr(), True)
self._callbacks = callbacks
self._helpers = callbacks.getHelpers()
callbacks.setExtensionName(config_dict["pkgname"])
callbacks.registerMessageEditorTabFactory(self)
return
def createNewInstance(self, controller, editable):
return Display_data(self, controller, editable)
class Display_data(IMessageEditorTab):
def __init__(self, extender, controller, editable):
self._helpers = extender._helpers
self._txtInput = extender._callbacks.createTextEditor()
self._extender = extender
def getUiComponent(self):
return self._txtInput.getComponent()
def getTabCaption(self):
return config_dict["pkgname"]
def isEnabled(self, content, isRequest):
return True
def setMessage(self, content, isRequest):
self._txtInput.setText("Loading ......")
_output = ""
self._content = content
if (content is None):
self._txtInput.setText(None)
self._txtInput.setEditable(False)
return
if isRequest:
if(self._txtInput.isTextModified()):
return
info = self._helpers.analyzeResponse(content)
body = content[info.getBodyOffset():].tostring()
header = info.getHeaders()
header_s = '\r\n'.join(header)
endata = {"body":base64.b64encode(body),"header":base64.b64encode(header_s)}
_resp = json.loads(self.api_req("get_req",json.dumps(endata)))
if (_resp['status']!="ok"):
# print(_resp)
return
request_bytes = self._helpers.stringToBytes(_resp["data"]["output"])
_output = self._helpers.buildHttpMessage(header, request_bytes)
else:
info = self._helpers.analyzeResponse(content)
body = content[info.getBodyOffset():].tostring()
header = info.getHeaders()
header_s = '\r\n'.join(header)
endata = {"body":base64.b64encode(body),"header":base64.b64encode(header_s)}
_resp = json.loads(self.api_req("decrypt",json.dumps(endata)))
print(_resp)
if (_resp['status']!="ok"):
# print(_resp)
return
request_bytes = self._helpers.stringToBytes(_resp["data"]["output"])
_output = self._helpers.buildHttpMessage(header, request_bytes)
self._txtInput.setEditable(False)
self._txtInput.setText(_output)
return
def getSelectedData(self):
return self._txtInput.getSelectedText()
def isModified(self):
return self._txtInput.isTextModified()
def getMessage(self):
if(self._txtInput.isTextModified()):
raw = self._txtInput.getText()
info = self._helpers.analyzeResponse(raw)
body = raw[info.getBodyOffset():].tostring()
header = info.getHeaders()
header_s = '\r\n'.join(header)
data = {"body":base64.b64encode(body),"header":base64.b64encode(header_s)}
_resp = json.loads(self.api_req("encrypt", json.dumps(data)))
if (_resp['status']!="ok"):
print("_resp")
return
request_bytes = self._helpers.stringToBytes(_resp["data"]["output"])
return self._helpers.buildHttpMessage(header, request_bytes)
else:
_origin = self._content
return _origin
def api_req(self, api, data):
try:
headers = {"Content-type": "APPlication/json","Accept": "text/plain"}
conn = httplib.HTTPConnection(config_dict["_api"])
conn.request("POST", "/api/v1/"+api, data, headers)
resp = conn.getresponse()
assert resp.status == 200
res_data = resp.read()
conn.close()
return res_data
except Exception as e:
return {"status":"error","data":e.__str__()}
实现两个请求响应接口:
1、encrypt接口:对获取到的body数据进行加密后返回。
2、decrypt接口:对获取到的body数据进行解密后返回。
注意:这里并没有给出hook具体加解密函数代码。具体代码可以写到js中然后加载即可
server.py实现代码:
import gzip
import json
import logging
import Frida
import tinydb
import base64
import time
import threading
from queue import Queue
from flask import Flask, request, jsonify
APP = Flask(__name__)
db = tinydb.TinyDB("backup.bin")
_backup = tinydb.Query()
q = Queue()
@APP.route('/')
def hello():
return "hello world"
def insert2db():
for _line in iter(q.get, None):
db.insert(_line)
logger = logging.getLogger("Frida")
def on_message(message, data):
if message['payload'] and "log_" in message['payload']:
_data = json.loads(message['payload'])
if "encrypt" in message['payload']:
q.put(_data)
else:
logger.info(message)
def init_Frida():
manager = Frida.get_device_manager()
flag = True
while flag:
try:
usb_device = manager.get_usb_device(10)
flag = False
except:
pass
logger.info(usb_device)
pid = usb_device.spawn(pkgname)
session = usb_device.attach(pid)
global script
script = session.create_script(
gzip.decompress(base64.b64decode(_magic)).decode())
script.on('message', on_message)
script.load()
script.exports.init()
usb_device.resume(pid)
def search_req_input_from_db(_input):
rows = db.search(_backup.output==_input)
if rows:
return rows[-1]['input']
else:
return _input
@APP.route('/api/v1/get_req', methods=['POST'])
def search_req():
# {"data": post_data}
_input = request.json['body']
return jsonify({"status": "ok", "data": {"output": search_req_input_from_db(base64.b64decode(_input).decode())}})
@APP.route('/api/v1/encrypt', methods=['POST'])
@APP.route('/api/v1/encrypt/', methods=['POST'])
def encrypt_req(key="AAAAAAAAAAAAAAAA"):
# {"data": post_data}
_input = request.json['body']
print(_input)
print(jsonify({"status": "ok", "data": {"output": script.exports.req_encrypt(base64.b64decode(_input).decode())}}))
return jsonify({"status": "ok", "data": {"output": script.exports.req_encrypt(base64.b64decode(_input).decode())}})
# {}.keys = [_method, _urlpath, _timestamp, _token, _params]
@APP.route('/api/v1/decrypt', methods=['POST'])
@APP.route('/api/v1/decrypt/', methods=['POST'])
def decrypt_resp(key="AAAAAAAAAAAAAAAA"):
# {"data": post_data}
_input = (request.json)['body']
print(_input)
return jsonify({"status": "ok", "data": {"output": script.exports.resp_decrypt(base64.b64decode(_input).decode())}})
def test_all():
# init_Frida()
# threading.Thread(target=insert2db).start()
# time.sleep(3) # wait for Frida ready
with APP.test_client() as c:
# decrypt
print("[!] testing decrypt")
_output = "eyJNU0ciOiLmgqjovpPlhaXnmoTnlKjmiLflkI3miJblr4bnoIHplJnor6/vvIEiLCJTVEFUVVMiOiJ2YWxpZGF0aW9uLmxvZ2luX3VzZXJfbm90X2V4aXN0In0="
_input = "{\"MSG\":\"\u60a8\u8f93\u5165\u7684\u7528\u6237\u540d\u6216\u5bc6\u7801\u9519\u8bef\uff01\",\"STATUS\":\"validation.login_user_not_exist\"}"
info = c.post("/api/v1/decrypt", json={"data":_output})
info = info.json
assert info['status'] == "ok"
print(info['data']['output'] == _input)
# encrypt
print("[!] testing encrypt")
info = c.post("/api/v1/encrypt",json = {"data":_input})
info = info.json
assert info['status'] == "ok"
print(info['data']['output'] ==_output)
if __name__ == '__main__':
# init_Frida()
# threading.Thread(target=insert2db).start()
APP.run(host="127.0.0.1",port="31337")
# print(script.exports.resp_decrypt("eyJNU0ciOiLmgqjovpPlhaXnmoTnlKjmiLflkI3miJblr4bnoIHplJnor6/vvIEiLCJTVEFUVVMiOiJ2YWxpZGF0aW9uLmxvZ2luX3VzZXJfbm90X2V4aXN0In0="))
# test_all()
关于APP测试非http协议测试:在APP测试过程中会遇到数据包请求非http这种情况,以及前面提到的非对称加密拦截修改响应包等操作。这种情况下如果能找到合适的hook点,然后使用下面这个工具也是可以完成目的的。具体原理和操作这个大佬README.md写的很清楚了,大家自行食用。
传送门:https://github.com/F6JO/mPaas-Frida-hook
web前端加解密这里也只是简单提一下,因为平时做的也比较少一些,主要是两个开源工具的介绍:
如果遇到复杂的加解密算法,比如某些在标准算法上魔改的算法,根本没法从JS中扣出关键代码的场景,就非常适合jsrpc的方式直接调用浏览器中的加解密函数。并且高级用法可以直接联动扫描器进行加密流量解密后的扫描工作。如何使用项目文档以及其衍生的介绍文档网上都有,大家搜搜探索一下就可以上手了
传送门:https://github.com/jxhczhl/JsRpc
1.首先需要开启 devtools设置中的Protocol Monitor⽤于监测cdp协议的调⽤记录
2.然后寻找加解密函数,这里以加密函数为例:
a.原文中给出了寻找加密函数的一个思路。通常我们要找的都是登录相关的数据包加解密。所以我们直接打开控制台的network选项卡
b.直接在登录数据包的启动器中选择submit那个链接,可以直接定位到数据包发送相关函数代码,然后再在函数调用栈中进行追溯寻找,寻找过程多打几个断点看作用域里的代码变量参数变化来不断逼近加解密函数的大概位置。
3.本例中的加密函数如下:
4.启动remotejs_windows_amd64.exe 参数实例如下:
remotejs -h
GLOBAL OPTIONS:
--url value, -u value open url when open chrome, default blank url
--chrome-path value, --cp value use specified chrome path
--proxy value set proxy for browser
--remote-debug-address value use remote chrome debugging
--web-listen value web server port (default: "8088")
--help, -h show help
./remotejs # 打开一个空白的浏览器
./remotejs -u [URL] # 打开一个浏览器,并加载指定url
./remotejs --remote-debug-address "ws://127.0.0.1:9222" # 指定一个远程浏览器(需要目标开remote-debugger-port)
我们这里以远程调试地址举例:
本地先启动一个开启了远程调试端口的浏览器,这个是要调试的浏览器。也就是在该浏览器上访问网站找到加密函数,然后在该函数的作用域上打断点进行调试。具体如下列图片所示:
然后点击运行remotejs_windows_amd64.exe 加上 --remote-debug-address "ws://127.0.0.1:9221"参数连接到我们刚才打开的浏览器
调用结果:
完结,给大佬们撒花!!!