目录

w13scan源码解析

源码主体目录结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 有省略.
├── api
├── certs
├── data
├── fingprints # 指纹插件目录
│   ├── framework
│   ├── os
│   ├── programing
│   └── webserver
├── lib # 核心源码目录
│   ├── api
│   ├── controller
│   ├── core
│   ├── helper
│   ├── parse
│   ├── proxy
│   └── reverse
├── scanners # 扫描插件
│   ├── PerFile
│   ├── PerFolder
│   └── PerServer
└── thirdpart 
│    └── requests
├── reverse.py  # 反连平台
├── config.py   # 配置文件
└── w13scan.py  # 入口文件

入口文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# w13scan.py
def main():
    version_check()  # 测试pyhon版本

    # init
    root = modulePath()     #  获取项目的root目录
    cmdline = cmd_line_parser()  # 解析cmdline
    init(root, cmdline)     # 初始化参数,写入congfig中
		if conf.url or conf.url_file:  # 主动扫描
				...
    elif conf.server_addr:  # 被动扫描(见下面详解)
        ...

初始化做了什么

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# lib/core/option.py
def init(root, cmdline):
    cinit(autoreset=True)
    setPaths(root)   # 设置一些目录结构的路径
    banner()
    _init_conf()  # 从config.py读取配置信息
    _merge_options(cmdline)  # 从cmdline读取配置,与config的merge
    _set_conf()          # 设置了ua、proxy、server_addr
    initKb()            # 见一下代码
    initPlugins()  # 加载插件,包括扫描插件+指纹识别插件,分别存储在KB["registered"][plugin]、KB["fingerprint"][name]中
    _init_stdout() 
    patch_all()  #  设置全局选项,包括disable_warnings,重现了requests.sessions库里的Session.request函数,路径在thirdpart/requests/__init__.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# lib/core/option.py
def initKb():
    KB['continue'] = False  # 线程一直继续
    KB['registered'] = dict()  # 注册的漏洞插件列表
    KB['fingerprint'] = dict()  # 注册的指纹插件列表
    KB['task_queue'] = Queue()  # 初始化队列
    KB["spiderset"] = SpiderSet()  # 去重复爬虫
    KB["console_width"] = getTerminalSize()  # 控制台宽度
    KB['start_time'] = time.time()  # 开始时间
    KB["lock"] = threading.Lock()  # 线程锁
    KB["output"] = OutPut()
    KB["running_plugins"] = dict()
    KB['finished'] = 0  # 完成数量
    KB["result"] = 0  # 结果数量
    KB["running"] = 0  # 正在运行数量

被动扫描

开启漏洞扫描器线程,并设置setDaemon为True,开启代理服务器,调用serve_forever()方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
	# w13scan.py
	elif conf.server_addr: 
				KB["continue"] = True  # 设置进程一直持续,不间断
        # 启动漏洞扫描器
        scanner = threading.Thread(target=start)   #start函数,见下面详解
        scanner.setDaemon(True)
        scanner.start()
        # 启动代理服务器
        baseproxy = AsyncMitmProxy(server_addr=conf.server_addr, https=True)

        try:
            baseproxy.serve_forever()
        except KeyboardInterrupt:
            scanner.join(0.1)
            threading.Thread(target=baseproxy.shutdown, daemon=True).start()
            deinit()
            print("\n[*] User quit")
        baseproxy.server_close()

代理实现是基于httpserver,实现http.server.BaseHTTPRequestHandler的do_XXX方法来处理请求,第一次会由经过代理(如果有)访问一次网站,获得第一次response,然后在经过一个判断,如果是一些静态资源,比如是jpg、zip等会不放行,也就是会忽略(但是js是放行的),具体实现如下,不是静态资源的话封装request、response推进队列进行扫描KB['task_queue'].put(('loader', req, resp))

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# lib/proxy/baseproxy.py
def _is_replay(self):
        '''
        决定是否放行
        :return:
        '''
        ret = False
        target = self._target or self.path

        if "?" in target:
            target = target[:target.index("?")]
        for ext in notAcceptedExt:
            if target.endswith(ext):
                ret = True
                break

        return ret

# lib/core/settings.py
notAcceptedExt = [
    ".css",
    ".jpg",
    ".jpeg",
    ".png",
    ".gif",
    ".wmv",
    ".a3c",
    ".ace",
    ".aif",
    ...
]

漏洞扫描函数

在被动扫描器开始start()函数的时候,开启31个线程同时处理,从KB[“task_queue”].get()去任务,然后调用插件的execute方法,这个插件是主插件,具体见如下分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# lib/controller/controller.py
def exception_handled_function(thread_function, args=()):
    try:
        thread_function(*args)
    except KeyboardInterrupt:
        KB["continue"] = False
        raise
    except Exception:
        traceback.print_exc()

def run_threads(num_threads, thread_function, args: tuple = ()):
    threads = []

    try:
        info_msg = "Staring [#{0}] threads".format(num_threads)
        logger.info(info_msg)

        # Start the threads
        for num_threads in range(num_threads):
            thread = threading.Thread(target=exception_handled_function, name=str(num_threads),
                                      args=(thread_function, args))
            thread.setDaemon(True)
            try:
                thread.start()
            except Exception as ex:
                err_msg = "error occurred while starting new thread ('{0}')".format(str(ex))
                logger.critical(err_msg)
                break

            threads.append(thread)

        # And wait for them to all finish
        alive = True
        while alive:
            alive = False
            for thread in threads:
                if thread.is_alive():
                    alive = True
                    time.sleep(0.1)

    except KeyboardInterrupt as ex:
        KB['continue'] = False
        raise

    except Exception as ex:
        logger.error("thread {0}: {1}".format(threading.currentThread().getName(), str(ex)))
        traceback.print_exc()
    finally:
        dataToStdout('\n')

def start():
    run_threads(conf.threads, task_run)

def task_run():
    while KB["continue"] or not KB["task_queue"].empty():
        poc_module_name, request, response = KB["task_queue"].get()  # 从KB["task_queue"]获取扫描任务,包括插件名、请求、回显
        KB.lock.acquire()  #  加锁
        KB.running += 1 
        if poc_module_name not in KB.running_plugins:
            KB.running_plugins[poc_module_name] = 0
        KB.running_plugins[poc_module_name] += 1
        KB.lock.release()  #  释放锁
        printProgress()  #  打印progress
        poc_module = copy.deepcopy(KB["registered"][poc_module_name]) #  深拷贝插件的类
        poc_module.execute(request, response)  #执行插件类的execute方法
        KB.lock.acquire()  # 加锁
        KB.finished += 1   
        KB.running -= 1
        KB.running_plugins[poc_module_name] -= 1
        if KB.running_plugins[poc_module_name] == 0:
            del KB.running_plugins[poc_module_name]

        KB.lock.release() #  释放锁 
        printProgress()
    printProgress()

插件入口

解析url,判断是什么语言的站,比如asp、aspx、php、java,然后进行指纹扫描,指纹扫描主要是针对response的header跟body来进行关键词查询,查询到写入response的name属性

针对参数: PerFile目录下 通用的检查模块 xss sql注入 s2 shiro 等

针对域名: PerServer目录下 备份文件 错误页面 等

针对url:分离出目录 然后每一个url目录都创建一个 PerFolder 任务 备份目录 phpinfo 源码泄露 等

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def audit(self):
        headers = self.requests.headers
        url = self.requests.url
        p = urlparse(url)
        if not p.netloc:
            return
        for rule in conf.excludes:
            if rule in p.netloc:
                logger.info("Skip domain:{}".format(url))
                return

        # fingerprint basic info
        exi = getattr
        if exi == ".asp":
            self.response.programing.append(WEB_PLATFORM.ASP)
            self.response.os.append(OS.WINDOWS)
        elif exi == ".aspx":
            self.response.programing.append(WEB_PLATFORM.ASPX)
            self.response.os.append(OS.WINDOWS)
        elif exi == ".php":
            self.response.programing.append(WEB_PLATFORM.PHP)
        elif exi == ".jsp" or exi == ".do" or exi == ".action":
            self.response.programing.append(WEB_PLATFORM.JAVA)

        for name, values in KB["fingerprint"].items():
            if not getattr(self.response, name):
                _result = []
                for mod in values:
                    m = mod.fingerprint(self.response.headers, self.response.text)
                    if isinstance(m, str):
                        _result.append(m)
                    if isListLike(m):
                        _result += list(m)
                if _result:
                    setattr(self.response, name, _result)

        # Fingerprint basic end
        if KB["spiderset"].add(url, 'PerFile'):
            task_push('PerFile', self.requests, self.response)

        # Send PerServer
        p = urlparse(url)
        domain = "{}://{}".format(p.scheme, p.netloc)
        if KB["spiderset"].add(domain, 'PerServer'):
            req = requests.get(domain, headers=headers, allow_redirects=False)
            fake_req = FakeReq(domain, headers, HTTPMETHOD.GET, "")
            fake_resp = FakeResp(req.status_code, req.content, req.headers)
            task_push('PerServer', fake_req, fake_resp)

        # Collect directory from response
        urls = set(get_parent_paths(url))
        for parent_url in urls:
            if not KB["spiderset"].add(parent_url, 'get_link_directory'):
                continue
            req = requests.get(parent_url, headers=headers, allow_redirects=False)
            if KB["spiderset"].add(req.url, 'PerFolder'):
                fake_req = FakeReq(req.url, headers, HTTPMETHOD.GET, "")
                fake_resp = FakeResp(req.status_code, req.content, req.headers)
                task_push('PerFolder', fake_req, fake_resp)

每次add任务的时候会调用url_etl函数对url进行泛化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class SpiderSet(object):
		....
		def add(self, url, plugin):
		        """
		        添加成功返回True,添加失败有重复返回False
		        :param url:
		        :param plugin:
		        :return:bool
		        """
		        ret = True
		        if not (isinstance(url, str) and isinstance(plugin, str)):
		            url = str(url)
		            plugin = str(plugin)
		
		        self.lock.acquire()
		        if plugin not in self.spider_list:
		            self.spider_list[plugin] = {}
		        netloc = urlparse.urlparse(url).netloc
		        if netloc not in self.spider_list[plugin]:
		            self.spider_list[plugin][netloc] = []
		        etl = url_etl(url)  # url泛化表达式
		        score = 0
		        for etl_url in self.spider_list[plugin][netloc]:
		            if not url_compare(etl, etl_url):
		                score += 1
		        if score == len(self.spider_list[plugin][netloc]):
		            self.spider_list[plugin][netloc].append(etl)
		        else:
		            ret = False
		        self.lock.release()
		        return ret

泛化的主要思想就是替换字符

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def etl(str, onlyNUM=False):
    '''
    传入一个字符串,将里面的字母转化为A,数字转化为N,特殊符号转换为T,其他符号或者字符转化成C
    :param str:
    :param onlyNUM:只换数字
    :return:
    '''
    chars = ""
    for c in str:
        c = c.lower()
        if not onlyNUM:
            if ord('a') <= ord(c) <= ord('z') and not onlyNUM:
                chars += 'A'
            elif ord('0') <= ord(c) <= ord('9'):
                chars += 'N'
            elif c in Chars:
                chars += 'T'
            else:
                chars += 'C'
        else:
            if ord('0') <= ord(c) <= ord('9'):
                chars += 'N'
            else:
                chars += c
    return chars

通过对比两个url的相似度,用的是Simhash算法

1
2
3
4
5
6
def url_compare(url, link):
    dis = Simhash(url).distance(Simhash(link))
    if -2 < dis < 5:
        return True
    else:
        return False

各模块的分析

PerFile

analyze_parameter

对params post cookies各参数对value进行判断是否是反序列化数据,举个例子,如果是java的反序列化数据,如果检测到ro0ab开头,会对其进行base64解码,然后判断是否是aced0005开头来判断是否是java的反序列化数据

1
2
3
4
5
6
if isJavaObjectDeserialization(v):
    whats = "JavaObjectDeserialization"
elif isPHPObjectDeserialization(v):
    whats = "PHPObjectDeserialization"
elif isPythonObjectDeserialization(v):
    whats = "PythonObjectDeserialization"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def isJavaObjectDeserialization(value, isJava=True):
    if len(value) < 10:
        return False
    if value[0:5].lower() == "ro0ab":
        ret = is_base64(value, isJava)
        if not ret:
            return False
        if ret.startswith(bytes.fromhex("ac ed 00 05")):
            return True
    return False

backup_file

解析url大概如下,挨个访问,如果response是200,会检查返回内容的前10个字节,会检查一下压缩包的文件头的hex,如果吻合的话,判断备份文件存在,或者如果返回Content-Type为application/octet-stream的话,判断备份文件存在

http://xxxxx.com/index.php => index.php.bak index.bak index.rar

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
headers = self.requests.headers
url = self.requests.url

a, b = os.path.splitext(url)
if not b:
    return
payloads = []
payloads.append(a + ".bak")
payloads.append(a + ".rar")
payloads.append(a + ".zip")
payloads.append(url + ".bak")
payloads.append(url + ".rar")
payloads.append(url + ".zip")
for payload in payloads:
		r = requests.get(payload, headers=headers, allow_redirects=False)
		if r.status_code == 200:
				try:
            content = r.raw.read(10)
        except:
            continue
				if self._check(content) or "application/octet-stream" in r.headers.get("Content-Type", ''):
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def _check(self, content):
        """
            根据给定的url,探测远程服务器上是存在该文件
            文件头识别
           * rar:526172211a0700cf9073
           * zip:504b0304140000000800
           * gz:1f8b080000000000000b,也包括'.sql.gz',取'1f8b0800' 作为keyword
           * tar.gz: 1f8b0800
           * mysqldump:                   -- MySQL dump:               2d2d204d7953514c
           * phpMyAdmin:                  -- phpMyAdmin SQL Dump:      2d2d207068704d794164
           * navicat:                     /* Navicat :                 2f2a0a204e617669636174
           * Adminer:                     -- Adminer x.x.x MySQL dump: 2d2d2041646d696e6572
           * Navicat MySQL Data Transfer: /* Navicat:                  2f2a0a4e617669636174
           * 一种未知导出方式:               -- -------:                  2d2d202d2d2d2d2d2d2d
            :param target_url:
            :return:
        """
        features = [b'\x50\x4b\x03\x04', b'\x52\x61\x72\x21',
                    b'\x2d\x2d\x20\x4d', b'\x2d\x2d\x20\x70\x68', b'\x2f\x2a\x0a\x20\x4e',
                    b'\x2d\x2d\x20\x41\x64', b'\x2d\x2d\x20\x2d\x2d', b'\x2f\x2a\x0a\x4e\x61']
        for i in features:
            if content.startswith(i):
                return True
        return False

command_php_code、command_asp_code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
_payloads = [
            "print(md5({}));".format(randint),
            ";print(md5({}));".format(randint),
            "';print(md5({}));$a='".format(randint),
            "\";print(md5({}));$a=\"".format(randint),
            "${{@print(md5({}))}}".format(randint),
            "${{@print(md5({}))}}\\".format(randint),
            "'.print(md5({})).'".format(randint)
					  ]
# 载入处理位置以及原始payload
iterdatas = self.generateItemdatas()  //会返回get post cookie三个地方的参数及值
errors = None
errors_raw = ()
# 根据原始payload和位置组合新的payload
for origin_dict, positon in iterdatas:
    payloads = self.paramsCombination(origin_dict, positon, _payloads)
		r = self.req(positon, payload)
		if not r:
        continue
    html1 = r.text
    if verify_result in html1:
				...
		if re.search(regx, html1, re.I | re.S | re.M):
				...
		
		if not errors:
       errors = sensitive_page_error_message_check(html1)
       if errors:
          errors_raw = (key, value)

生成的payload如下,挨个请求,做如下三个判断呢

  1. 对返回对页面是否包含verify_result,这个result是由生成对随机数字相乘的结果
  2. 对返回页面进行报错正则匹配,可能是由于符号闭合问题,导致的报错,正则regx = 'Parse error: syntax error,.*?\sin\s'
  3. 对返回页面是否由敏感信息报错出现,比如一些mysql连接信息、django调试信息等

command_asp_code相比command_php_code只是简单的做了结果1的判断,没有做够2跟3,这里就不分析了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
PHPSESSID=print(md5(7298));; security=low;
PHPSESSID=in8btjut5kedoi1mlmta2227l5; security=print(md5(7298));;
PHPSESSID=;print(md5(7298));; security=low;
PHPSESSID=in8btjut5kedoi1mlmta2227l5; security=;print(md5(7298));;
PHPSESSID=';print(md5(7298));$a='; security=low;
PHPSESSID=in8btjut5kedoi1mlmta2227l5; security=';print(md5(7298));$a=';
PHPSESSID=";print(md5(7298));$a="; security=low;
PHPSESSID=in8btjut5kedoi1mlmta2227l5; security=";print(md5(7298));$a=";
PHPSESSID=${@print(md5(7298))}; security=low;
PHPSESSID=in8btjut5kedoi1mlmta2227l5; security=${@print(md5(7298))};
PHPSESSID=${@print(md5(7298))}\; security=low;
PHPSESSID=in8btjut5kedoi1mlmta2227l5; security=${@print(md5(7298))}\;
PHPSESSID='.print(md5(7298)).'; security=low;
PHPSESSID=in8btjut5kedoi1mlmta2227l5; security='.print(md5(7298)).';

command_system

主要是三个命令,一个set|set&setecho 6162983|base64ping -nc 1 {} ,一个是针对有回显,一个是针对无回显,通过dns的方式来探测,借助的dnslog.cn

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
url_flag = {
            "set|set&set": [
                'Path=[\s\S]*?PWD=',
                'Path=[\s\S]*?PATHEXT=',
                'Path=[\s\S]*?SHELL=',
                'Path\x3d[\s\S]*?PWD\x3d',
                'Path\x3d[\s\S]*?PATHEXT\x3d',
                'Path\x3d[\s\S]*?SHELL\x3d',
                'SERVER_SIGNATURE=[\s\S]*?SERVER_SOFTWARE=',
                'SERVER_SIGNATURE\x3d[\s\S]*?SERVER_SOFTWARE\x3d',
                'Non-authoritative\sanswer:\s+Name:\s*',
                'Server:\s*.*?\nAddress:\s*'
            ],
            "echo `echo 6162983|base64`6162983": [
                "NjE2Mjk4Mwo=6162983"
            ]
        }
if OS.WINDOWS in self.response.os:
		del url_flag["echo `echo 6162983|base64`6162983".format(randint)]
dns = reverseApi()
if dns.isUseReverse():
    dnsdomain = dns.generate_dns_token()
    dns_token = dnsdomain["token"]
    fullname = dnsdomain["fullname"]
    reverse_payload = "ping -nc 1 {}".format(fullname)
    url_flag[reverse_payload] = []
iterdatas = self.generateItemdatas()
for origin_dict, positon in iterdatas:
		payloads = self.paramsCombination(origin_dict, positon, url_flag)

这里对命令进行了分隔符操作,一共四个分隔符号 ['', ';', "&&", "|"],接下来就是拼接好payload然后发起请求,对response做正则匹配,无回显的就对dnslog日志查询。

directory_traversal

对请求中参数value中存在.或者/ 以及参数名为filename file path filepath 会去尝试目录穿越的操作

if ("." in value or "/" in value) or (key.lower() in ['filename', 'file', 'path', 'filepath']):

payload方面,主要是如下集中如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
linux:
../../../../../../../../../../../etc/passwd%00
/etc/passwd

window: 
../../../../../../../../../../windows/win.ini
C:\\boot.ini
C:\\WINDOWS\\system32\\drivers\\etc\\hosts

java:
/WEB-INF/web.xml
../../WEB-INF/web.xml

存在目录遍利的依据有两种方式:

一个是明文匹配,一个是正则匹配

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
plainArray = [
            "; for 16-bit app support",
            "[MCI Extensions.BAK]",
            "# This is a sample HOSTS file used by Microsoft TCP/IP for Windows.",
            "# localhost name resolution is handled within DNS itself.",
            "[boot loader]"
        ]

regexArray = [
            '(Linux+\sversion\s+[\d\.\w\-_\+]+\s+\([^)]+\)\s+\(gcc\sversion\s[\d\.\-_]+\s)',
            '(root:\w:\d*:)',
            "System\.IO\.FileNotFoundException: Could not find file\s'\w:",
            "System\.IO\.DirectoryNotFoundException: Could not find a part of the path\s'\w:",
            "<b>Warning<\/b>:\s\sDOMDocument::load\(\)\s\[<a\shref='domdocument.load'>domdocument.load<\/a>\]:\s(Start tag expected|I\/O warning : failed to load external entity).*(Windows\/win.ini|\/etc\/passwd).*\sin\s<b>.*?<\/b>\son\sline\s<b>\d+<\/b>",
            "(<web-app[\s\S]+<\/web-app>)",
            "Warning: fopen\(",
            "open_basedir restriction in effect",
            '/bin/(bash|sh)[^\r\n<>]*[\r\n]',
            '\[boot loader\][^\r\n<>]*[\r\n]'
        ]

js_sensitive_content

没什么好分析的,主要是正则

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
regx = {
            # 匹配url
            "url": r'(\b|\'|")(?:http:|https:)(?:[\w/\.]+)?(?:[a-zA-Z0-9_\-\.]{1,})\.(?:php|asp|ashx|jspx|aspx|jsp|json|action|html|txt|xml|do)(\b|\'|")',
            # 匹配邮箱
            "邮箱信息": r'[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+(?:\.[a-zA-Z0-9_-]+)+',
            # 匹配token或者密码泄露
            # 例如token = xxxxxxxx, 或者"apikey" : "xssss"
            "Token或密码": r'\b(?:secret|secret_key|token|secret_token|auth_token|access_token|username|password|aws_access_key_id|aws_secret_access_key|secretkey|authtoken|accesstoken|access-token|authkey|client_secret|bucket|email|HEROKU_API_KEY|SF_USERNAME|PT_TOKEN|id_dsa|clientsecret|client-secret|encryption-key|pass|encryption_key|encryptionkey|secretkey|secret-key|bearer|JEKYLL_GITHUB_TOKEN|HOMEBREW_GITHUB_API_TOKEN|api_key|api_secret_key|api-key|private_key|client_key|client_id|sshkey|ssh_key|ssh-key|privatekey|DB_USERNAME|oauth_token|irc_pass|dbpasswd|xoxa-2|xoxrprivate-key|private_key|consumer_key|consumer_secret|access_token_secret|SLACK_BOT_TOKEN|slack_api_token|api_token|ConsumerKey|ConsumerSecret|SESSION_TOKEN|session_key|session_secret|slack_token|slack_secret_token|bot_access_token|passwd|api|eid|sid|api_key|apikey|userid|user_id|user-id)["\s]*(?::|=|=:|=>)["\s]*[a-z0-9A-Z]{8,64}"?',
            # 匹配IP地址
            "IP地址": r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b',
            # 匹配云泄露
            "Cloudfront云泄露": r'[\w]+\.cloudfront\.net',
            "Appspot云泄露": r'[\w\-.]+\.appspot\.com',
            "亚马逊云泄露": r'[\w\-.]*s3[\w\-.]*\.?amazonaws\.com\/?[\w\-.]*',
            "Digitalocean云泄露": r'([\w\-.]*\.?digitaloceanspaces\.com\/?[\w\-.]*)',
            "Google云泄露": r'(storage\.cloud\.google\.com\/[\w\-.]+)',
            "Google存储API泄露": r'([\w\-.]*\.?storage.googleapis.com\/?[\w\-.]*)',
            # 匹配手机号
            "手机号": r'(?:139|138|137|136|135|134|147|150|151|152|157|158|159|178|182|183|184|187|188|198|130|131|132|155|156|166|185|186|145|175|176|133|153|177|173|180|181|189|199|170|171)[0-9]{8}',
            # 匹配域名
            # "域名泄露": r'((?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+(?:biz|cc|club|cn|com|co|edu|fun|group|info|ink|kim|link|live|ltd|mobi|net|online|org|pro|pub|red|ren|shop|site|store|tech|top|tv|vip|wang|wiki|work|xin|xyz|me))',

            # SSH 密钥
            "SSH密钥": '([-]+BEGIN [^\\s]+ PRIVATE KEY[-]+[\\s]*[^-]*[-]+END [^\\s]+ '
                     'PRIVATE KEY[-]+)',

            # access_key
            "Access Key": 'access_key.*?["\'](.*?)["\']',
            "Access Key ID 1": 'accesskeyid.*?["\'](.*?)["\']',
            "Access Key ID 2": 'accesskeyid.*?["\'](.*?)["\']',

            # 亚马逊 aws api 账号 密钥
            "亚马逊AWS API": 'AKIA[0-9A-Z]{16}',
            "亚马逊AWS 3S API 1": 's3\\.amazonaws.com[/]+|[a-zA-Z0-9_-]*\\.s3\\.amazonaws.com',
            "亚马逊AWS 3S API 2": '([a-zA-Z0-9-\\.\\_]+\\.s3\\.amazonaws\\.com|s3://[a-zA-Z0-9-\\.\\_]+|s3-[a-zA-Z0-9-\\.\\_\\/]+|s3.amazonaws.com/[a-zA-Z0-9-\\.\\_]+|s3.console.aws.amazon.com/s3/buckets/[a-zA-Z0-9-\\.\\_]+)',
            "亚马逊AWS 3S API 3": 'amzn\\\\.mws\\\\.[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}',

            # author 信息
            "作者信息": '@author[: ]+(.*?) ',
            "API": 'api[key|_key|\\s+]+[a-zA-Z0-9_\\-]{5,100}',
            "基础信息": 'basic [a-zA-Z0-9=:_\\+\\/-]{5,100}',
            "Bearer": 'bearer [a-zA-Z0-9_\\-\\.=:_\\+\\/]{5,100}',

            # facebook token
            "Facebook Token": 'EAACEdEose0cBA[0-9A-Za-z]+',
            # github token
            "Github Token": '[a-zA-Z0-9_-]*:[a-zA-Z0-9_\\-]+@github\\.com*',
            # google api
            "Google API": 'AIza[0-9A-Za-z-_]{35}',
            # google captcha 验证
            "Google验证码": '6L[0-9A-Za-z-_]{38}|^6[0-9a-zA-Z_-]{39}$',
            # google oauth 权限
            "Google OAuth": 'ya29\\.[0-9A-Za-z\\-_]+',
            # jwt
            "JWT鉴权": 'ey[A-Za-z0-9-_=]+\\.[A-Za-z0-9-_=]+\\.?[A-Za-z0-9-_.+/=]*$',
            # mailgun 服务密钥
            "Mailgun服务密钥": 'key-[0-9a-zA-Z]{32}',
            # paypal braintree 访问凭证
            "Paypal/Braintree访问凭证": 'access_token\\$production\\$[0-9a-z]{16}\\$[0-9a-f]{32}',
            # PGP 密钥块
            "PGP密钥": '-----BEGIN PGP PRIVATE KEY BLOCK-----',
            # possible_creds
            "密码泄露": '(?i)(password\\s*[`=:\\"]+\\s*[^\\s]+|password '
                    'is\\s*[`=:\\"]*\\s*[^\\s]+|pwd\\s*[`=:\\"]*\\s*[^\\s]+|passwd\\s*[`=:\\"]+\\s*[^\\s]+)',

            # RSA
            "RSA密钥": '-----BEGIN EC PRIVATE KEY-----',
            # DSA
            "DSA密钥": '-----BEGIN DSA PRIVATE KEY-----',
            # stripe 账号泄露
            "Stripe账号泄露 1": 'rk_live_[0-9a-zA-Z]{24}',
            "Stripe账号泄露 2": 'sk_live_[0-9a-zA-Z]{24}',
            # twillio 账号泄露
            "Twillio 账号泄露 1": 'AC[a-zA-Z0-9_\\-]{32}',
            "Twillio 账号泄露 2": 'SK[0-9a-fA-F]{32}',
            "Twillio 账号泄露 3": 'AP[a-zA-Z0-9_\\-]{32}'
        }

jsonp

如果get参数中存在callbaks = ["callback", "cb", "json"],会对response调用pyjsparser解析javascipt

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def check_sentive_content(self, resp: str) -> set:
        script = resp.strip()
        if not script:
            return set()
        if script[0] == "{":
            script = "d=" + script
        try:
            nodes = parse(script)["body"]
        except pyjsparser.pyjsparserdata.JsSyntaxError as e:
            return set()
        literals = analyse_Literal(nodes)
        result = set()
        for item in literals:
            v = self.info_search(item)
            if v:
                result.add(v["content"])
        return result

主要是对ast中的node的type为Literal提取出来,然后加入list里

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def analyse_Literal(node) -> list:
    if isinstance(node, dict):
        r = []
        if node.get("type") == "Literal":
            value = node.get("value", None)
            if value:
                r.append(str(value))

        for key, value in node.items():
            dd = analyse_Literal(value)
            r.extend(dd)
        return r
    elif isinstance(node, list):
        r = []
        for item in node:
            r.extend(analyse_Literal(item))
        return r
    return []

然后对list的内容进行敏感信息探测,这里进行了两个探测,一个是敏感内容,还有一个是敏感参数,内容比如是银行卡、id卡、手机号、邮箱,敏感参数主要是username、mobilephone、email等,具体看代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def info_search(self, text) -> dict:
        '''
        从一段文本中搜索敏感信息
        :param text:
        :return:
        '''
        sensitive_params = [sensitive_bankcard, sensitive_idcard, sensitive_phone, sensitive_email]
        sensitive_list = ['username', 'memberid', 'nickname', 'loginid', 'mobilephone', 'userid', 'passportid',
                          'profile', 'loginname', 'loginid',
                          'email', 'realname', 'birthday', 'sex', 'ip']

        for func in sensitive_params:
            ret = func(text)
            if ret:
                return ret
        for item in sensitive_list:
            if item.lower() == text.lower():
                return {"type": "keyword", "content": item}

当检测到有敏感信息的时候,会对改请求去掉referer重新请求一次,如果还能获取到敏感信息,即判断存在jsonp漏洞

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
result = self.check_sentive_content(self.response.text)
if not result:
    return
p = urlparse(self.requests.url)
fake_domain = "{}://{}".format(p.scheme, p.netloc) + random_str(4,
                                                                string.ascii_lowercase + string.digits) + ".com/"
headers = self.requests.headers
headers["Referer"] = fake_domain
req = requests.get(self.requests.url, headers=headers)
result2 = self.check_sentive_content(req.text)

php_real_path

主要是利用了后端如果没有关闭报错输出的话,在接收一些字符串的时候,如果传入的是array,会产生warning,利用这一点,可以获得一些敏感路径信息的泄漏

1
2
3
4
5
6
7
8
9
iterdatas = self.generateItemdatas()

for item in iterdatas:
    iterdata, positon = item
    for k, v in iterdata.items():
        data = copy.deepcopy(iterdata)
        del data[k]
        key = k + "[]"
        data[key] = v

判断依据,匹配

1
if "Warning" in r.text and "array given in " in r.text:

poc_fastjson

判断传输的是否是json格式,然后进行两步探测,第一步探测是否使用的是fastjson for 1.2.67

第二步探测Fastjson 1.24-1.27 ,根据dnslog的记录来判断是否存在fastjson反序列化漏洞

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
r = requests.post(self.requests.url, data=self.generate_check_fastjson(dnsdomain), headers=headers)
isFastjson = dnslog.check()
if isFastjson:
    result = self.new_result()
    result.init_info(self.requests.url, "使用了Fastjson", VulType.CODE_INJECTION)
    result.add_detail("payload", r.reqinfo, generateResponse(r),
                      "第三方dnslog有日志回显:{}".format(repr(isFastjson)), "", "", PLACE.GET)
    self.success(result)
else:
    return

reqlist = []
for payload in [self.generate_payload_1_2_24(dnsdomain), self.generate_payload_1_2_47(dnsdomain)]:
    r = requests.post(self.requests.url, data=payload, headers=headers)
    reqlist.append(r)
dnslist = dnslog.check()
if dnslist:
    result = self.new_result()
    result.init_info(self.requests.url, "Fastjson Poc 1.24-1.27", VulType.CODE_INJECTION)
    for req in reqlist:
        result.add_detail("payload请求", req.reqinfo, generateResponse(req),
                          "第三方dnslog有日志回显:{}".format(repr(dnslist)), "", "", PLACE.POST)
    self.success(result)

shiro

判断response是否存在deleteme字段,如果存在,进行key爆破,采用的cc链子,如果不存在,会进行探测,在cookie中主动设置rememberMe,判断返回中是否存在deleteme来判断

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
if "deleteMe" in respHeader.get('Set-Cookie', ''):
		isShiro = True
		result = ResultObject(self)
		result.init_info(self.requests.url, "Shiro框架发现", VulType.BASELINE)
		result.add_detail("payload探测", self.requests.raw, self.response.raw,
		                  "在返回的cookie中发现了deleteMe标记", "", "", PLACE.GET)
		self.success(result)

if not isShiro:
# 如果不是shiro框架,检测一下
		reqHeader = self.requests.headers
		if "Cookie" not in reqHeader:
				reqHeader["Cookie"] = ""
		_cookie = paramToDict(reqHeader["Cookie"], place=PLACE.COOKIE)
		_cookie["rememberMe"] = "2"
		reqHeader["Cookie"] = url_dict2str(_cookie, PLACE.COOKIE)
		req = None
		if self.requests.method == HTTPMETHOD.GET:
				req = requests.get(self.requests.url, headers=reqHeader)
		elif self.requests.method == HTTPMETHOD.POST:
				req = requests.post(self.requests.url, data=self.requests.post_data, headers=reqHeader)
		if req and "deleteMe" in req.headers.get('Set-Cookie', ''):
				...

if isShiro:
		self._check_key()

sqli_bool

sqli_error

结合的payload是利用了一些闭合报错、函数报错、布尔运算报错来探测

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
_payloads = [
            '鎈\'"\(',
            "'", "')", "';", '"', '")', '";', ' order By 500 ', "--", "-0",
            ") AND {}={} AND ({}={}".format(num, num + 1, num, num),
            " AND {}={}%23".format(num, num + 1),
            " %' AND {}={} AND '%'='".format(num, num + 1), " ') AND {}={} AND ('{}'='{}".format(num, num + 1, s, s),
            " ' AND {}={} AND '{}'='{}".format(num, num + 1, s, s),
            '`', '`)',
            '`;', '\\', "%27", "%%2727", "%25%27", "%60", "%5C",
            "extractvalue(1,concat(char(126),md5({})))".format(num),
            "convert(int,sys.fn_sqlvarbasetostr(HashBytes('MD5','{}')))".format(num)
        ]

匹配的报错正则,如发现有即判断有报错注入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{'regex': '"Message":"Invalid web service call', 'type': 'ASP.Net'},
{'regex': 'Exception of type', 'type': 'ASP.Net'},
{'regex': '--- End of inner exception stack trace ---', 'type': 'ASP.Net'},
{'regex': 'Microsoft OLE DB Provider', 'type': 'ASP.Net'},
{'regex': 'Error ([\d-]+) \([\dA-Fa-f]+\)', 'type': 'ASP.Net'},
{'regex': 'at ([a-zA-Z0-9_]*\.)*([a-zA-Z0-9_]+)\([a-zA-Z0-9, \[\]\&\;]*\)', 'type': 'ASP.Net'},
{'regex': '([A-Za-z]+[.])+[A-Za-z]*Exception: ', 'type': 'ASP.Net'},
{'regex': 'in [A-Za-z]:\([A-Za-z0-9_]+\)+[A-Za-z0-9_\-]+(\.aspx)?\.cs:line [\d]+', 'type': 'ASP.Net'},
{'regex': 'Syntax error in string in query expression', 'type': 'ASP.Net'},
{'regex': '\.java:[0-9]+', 'type': 'Java'}, {'regex': '\.java\((Inlined )?Compiled Code\)', 'type': 'Java'},
{'regex': '\.invoke\(Unknown Source\)', 'type': 'Java'}, {'regex': 'nested exception is', 'type': 'Java'},
{'regex': '\.js:[0-9]+:[0-9]+', 'type': 'Javascript'}, {'regex': 'JBWEB[0-9]{{6}}:', 'type': 'JBoss'},
{'regex': '((dn|dc|cn|ou|uid|o|c)=[\w\d]*,\s?){2,}', 'type': 'LDAP'},
{'regex': '\[(ODBC SQL Server Driver|SQL Server|ODBC Driver Manager)\]', 'type': 'Microsoft SQL Server'},
{'regex': 'Cannot initialize the data source object of OLE DB provider "[\w]*" for linked server "[\w]*"','type': 'Microsoft SQL Server'}, 
{'regex': 'You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near','type': 'MySQL'},
{'regex': 'Illegal mix of collations \([\w\s\,]+\) and \([\w\s\,]+\) for operation', 'type': 'MySQL'},
{'regex': 'at (\/[A-Za-z0-9\.]+)*\.pm line [0-9]+', 'type': 'Perl'},
{'regex': '\.php on line [0-9]+', 'type': 'PHP'}, {'regex': '\.php</b> on line <b>[0-9]+', 'type': 'PHP'},
{'regex': 'Fatal error:', 'type': 'PHP'}, {'regex': '\.php:[0-9]+', 'type': 'PHP'},
{'regex': 'Traceback \(most recent call last\):', 'type': 'Python'},
{'regex': 'File "[A-Za-z0-9\-_\./]*", line [0-9]+, in', 'type': 'Python'},
{'regex': '\.rb:[0-9]+:in', 'type': 'Ruby'}, {'regex': '\.scala:[0-9]+', 'type': 'Scala'},
{'regex': '\(generated by waitress\)', 'type': 'Waitress Python server'}, 
{'regex': '132120c8|38ad52fa|38cf013d|38cf0259|38cf025a|38cf025b|38cf025c|38cf025d|38cf025e|38cf025f|38cf0421|38cf0424|38cf0425|38cf0427|38cf0428|38cf0432|38cf0434|38cf0437|38cf0439|38cf0442|38cf07aa|38cf08cc|38cf04d7|38cf04c6|websealerror','type': 'WebSEAL'},
{'type': 'ASPNETPathDisclosure','regex': "<title>Invalid\sfile\sname\sfor\smonitoring:\s'([^']*)'\.\sFile\snames\sfor\smonitoring\smust\shave\sabsolute\spaths\,\sand\sno\swildcards\.<\/title>"},
{'type': 'Struts2DevMod','regex': 'You are seeing this page because development mode is enabled.  Development mode, or devMode, enables extra'},
{'type': 'Django DEBUG MODEL','regex': "You're seeing this error because you have <code>DEBUG = True<\/code> in"},
{'type': 'RailsDevMode', 'regex': '<title>Action Controller: Exception caught<\/title>'},
{'type': 'RequiredParameter', 'regex': "Required\s\w+\sparameter\s'([^']+?)'\sis\snot\spresent"},
{'type': 'Thinkphp3 Debug', 'regex': '<p class="face">:\(</p>'},
{'type': 'xdebug', "regex": "class='xdebug-error xe-fatal-error'"}

sql_time

时间盲注,主要分两步,第一步投放带休眠的payload,如果回显时间delta1大于设置的休眠时间,在发送一个不带休眠的payload,这一次的回显的时间delta0 满足 delta1 > delta0 > 0 ,讲这两步执行两次(默认),如果两次都满足条件,则证明有时间盲注

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
sql_times = {
    "MySQL": (
        " AND SLEEP({})".format(self.sleep_str),
        " AND SLEEP({})--+".format(self.sleep_str),
        "' AND SLEEP({})".format(self.sleep_str),
        "' AND SLEEP({})--+".format(self.sleep_str),
        "' AND SLEEP({}) AND '{}'='{}".format(self.sleep_str, num, num),
        '''" AND SLEEP({}) AND "{}"="{}'''.format(self.sleep_str, num, num)),
    "Postgresql": (
        "AND {}=(SELECT {} FROM PG_SLEEP({}))".format(num, num, self.sleep_str),
        "AND {}=(SELECT {} FROM PG_SLEEP({}))--+".format(num, num, self.sleep_str),
    ),
    "Microsoft SQL Server or Sybase": (
        " waitfor delay '0:0:{}'--+".format(self.sleep_str),
        "' waitfor delay '0:0:{}'--+".format(self.sleep_str),
        '''" waitfor delay '0:0:{}'--+'''.format(self.sleep_str)),
    "Oracle": (
        " and 1= dbms_pipe.receive_message('RDS', {})--+".format(self.sleep_str),
        "' and 1= dbms_pipe.receive_message('RDS', {})--+".format(self.sleep_str),
        '''"  and 1= dbms_pipe.receive_message('RDS', {})--+'''.format(self.sleep_str),
        "AND 3437=DBMS_PIPE.RECEIVE_MESSAGE(CHR(100)||CHR(119)||CHR(112)||CHR(71),{})".format(self.sleep_str),
        "AND 3437=DBMS_PIPE.RECEIVE_MESSAGE(CHR(100)||CHR(119)||CHR(112)||CHR(71),{})--+".format(
            self.sleep_str),
    )
}

主体检测代码如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
for i in range(self.verify_count):
    start_time = time.perf_counter()
    r1 = self.req(position, new_dict)
    if not r1:
        continue
    end_time_1 = time.perf_counter()
    delta1 = end_time_1 - start_time
    if delta1 > self.sleep_time:
        r0 = self.req(position, zero_dict)
        end_time_0 = time.perf_counter()
        delta0 = end_time_0 - end_time_1
        if delta1 > delta0 > 0:
            flag += 1
            delta = round(delta1 - delta0, 3)
            continue
    break

ssti

主体思路,先对self.response的html进行html解析,如果标签是input key的值是name 那么result加入这个value值,如果标签是script,解析这段js代码获取body,然后获取里面key是name的value值

这里发现一个小问题,这里pyjsparser 解析javascript好像只支持ECMAScript 5.1,打算后面重写,利用Js2py写,所以我这里对代码做了一点点改动

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def getParamsFromHtml(html):
    parse = MyHTMLParser()
    parse.feed(html)
    tokens = parse.getTokenizer()
    result = set()
    for token in tokens:
        tagname = token["tagname"].lower()
        if tagname == "input":
            for attibute in token["attibutes"]:
                key, value = attibute
                if key == "name":
                    result.add(value)
                    break
        elif tagname == "script":
            content = token["content"]
            try:
                nodes = pyjsparser.parse(content).get("body", [])
            except pyjsparser.pyjsparserdata.JsSyntaxError as e:
                continue # 原来是return []
            result |=set(analyse_js(nodes))
    return list(result)

然后其实还发现一个问题,一般input这种大多出现在post表单,但是w13scan只是利用接收到到请求方式,继续利用这个请求,比如你传到w13scan是GET,然后发包的时候也是GET,导致后端要接收POST数据的,接收不到,这是一个小问题,很容易漏掉很简单的漏洞点,还有如果是调用的js,也存在问题,需要想办法解决这种场景的探测,但是这属于主动扫描场景了,对被动扫描器来说要求有点高了

/images/w13scan源码解析/Untitled.png

然后显示通过随机生成的字符串探测这些,如果回显存在这些随机字符串,在进行ssti漏洞的探测

1
2
3
4
5
for key in parse_params:
    params_data[key] = random_str(6)
		params_data.update(self.requests.params)
    resp = requests.get(self.requests.netloc, params=params_data, headers=self.requests.headers).text
		iterdatas = self.generateItemdatas(params_data)

进行两次检验,每一次检验会发起三次请求,代码如下,如果满足条件,则判断存在ssti漏洞

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
r1 = self.test_ssti(data, k, position)
if r1:
    r2 = self.test_ssti(data, k, position)
    if r2:
        result = self.new_result()
        result.init_info(self.requests.url, "SSTI模板注入", VulType.SSTI)
        result.add_detail("第一次payload请求", r1["request"], r1["response"],
                          r1["desc"], k, r1["payload"], position)
        result.add_detail("第二次payload请求", r2["request"], r2["response"],
                          r2["desc"], k, r2["payload"], position)
        self.success(result)
        break
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def test_ssti(self, data, k, positon):
        randnum1 = random.randint(1000, 10000)
        randnum2 = random.randint(8888, 20000)
        checksum = str(randnum1 * randnum2)
        ssti_payloads = self.getSSTIPayload(randnum1, randnum2)
        for payload in ssti_payloads:
            data[k] = payload
            # 不编码请求
            r1 = self.req(positon, url_dict2str(data, positon))
            if checksum in r1.text:
                return {
                    "request": r1.reqinfo,
                    "response": generateResponse(r1),
                    "desc": "payload:{} 会回显{} 不编码payload".format(payload, checksum),
                    "payload": payload
                }
            # url编码请求
            r1 = self.req(positon, data)
            if checksum in r1.text:
                return {
                    "request": r1.reqinfo,
                    "response": generateResponse(r1),
                    "desc": "payload:{} 会回显{} url编码payload".format(payload, checksum),
                    "payload": payload
                }
            # html编码请求
            data[k] = html.escape(data[k])
            r1 = self.req(positon, data)
            if checksum in r1.text:
                return {

struts2_032 struts2_045

直接打payload,然后正则匹配关键字。

uauth

先判断请求头里是否有以下字段 [“cookie”, “token”, “auth”],删除了 cookie token aut 访问的页面与之前正常访问的相似度是多少来判断是否有未授权访问

1
2
min_len = min(len(resp), len(r.text))
self.seqMatcher = difflib.SequenceMatcher(None, resp[:min_len], r.text[:min_len])

webpack

1
2
3
4
if self.requests.suffix.lower() == '.js':
    new_url = self.requests.url + ".map"
    req = requests.get(new_url, headers=self.requests.headers)
    if req.status_code == 200 and 'webpack:///' in req.text:

XSS

PerFolder

backup_folder

访问如下文件,判断回显是否是200及内容前十个字节是否是一些压缩包的文件头来判断是否存在备份文件

1
2
3
file_dic = ['bak.rar', 'bak.zip', 'backup.rar', 'backup.zip', 'www.zip', 'www.rar', 'web.rar', 'web.zip',
                    'wwwroot.rar',
                    'wwwroot.zip', 'log.zip', 'log.rar'
1
2
3
4
content = r.raw.read(10)
if r.status_code == 200 and self._check(content):
		if int(r.headers.get('Content-Length', 0)) == 0:
				continue

directory_browse

判断页面是否存在如下字符串,有则存在目录遍历

1
2
3
4
5
6
7
8
flag_list = [
            "directory listing for",
            "<title>directory",
            "<head><title>index of",
            '<table summary="directory listing"',
            'last modified</a>',

        ]

phpinfo_craw

访问如下文件,是否存在phpinfo页面 ,判断依据flag = "<title>phpinfo()</title>"

1
2
3
4
5
6
7
8
9
variants = [
                "phpinfo.php",
                "pi.php",
                "php.php",
                "i.php",
                "test.php",
                "temp.php",
                "info.php",
            ]

repository_leak

仓库泄漏,访问一些仓库信息,匹配正则在value中

1
2
3
4
5
6
7
flag = {
            "/.svn/all-wcprops": "svn🚾ra_dav:version-url",
            "/.git/config": 'repositoryformatversion[\s\S]*',
            "/.bzr/README": 'This\sis\sa\sBazaar[\s\S]',
            '/CVS/Root': ':pserver:[\s\S]*?:[\s\S]*',
            '/.hg/requires': '^revlogv1.*'
        }

PerServer

backup_domain

针对域名备份文件的扫描,比如访问的是www.baidu.com,会请求

http://www.baidu.com/www.zip

http://www.baidu.com/www.rar

http://www.baidu.com/baidu.zip

http://www.baidu.com/baidu.rar

http://www.baidu.com/com.zip

http://www.baidu.com/com.rar

1
2
3
>>> from tld import parse_tld
>>> parse_tld('http://www.baidu.com/')
('com', 'baidu', 'www')
1
2
3
4
5
for payload in payloads:
    if not payload:
        continue
    for i in ['.rar', '.zip']:
        test_url = domain + payload + i

errorpage

访问一个不存在的错误页面,可以从这个页面中获取一些信息,然后进行错误页面正则匹配

1
domain = "{}://{}/".format(p.scheme, p.netloc) + random_str(6) + ".jsp"

idea

先请求 xml payload = domain + ".idea/workspace.xml" 然后判断后输出

iis_parse

W13SCAN/scanners/PerServer/iis_parse.py

请求 domain/robots.txt/.php

判断请求头 响应体

1
2
3
4
5
6
7
payload = domain + "robots.txt/.php"

r = requests.get(payload, headers=headers, allow_redirects=False)

ContentType = r.headers.get("Content-Type", '')

if 'html' in ContentType and "allow" in r.text:

net_xss

请求了两个payload

1
2
3
4
5
6
7
payload = "(A({}))/".format(random_str(6))

url = domain + payload

new_payload = "(A(\"onerror='{}'{}))/".format(random_str(6), random_str(6))

url2 = domain + new_payload

在响应中没有编码还是存就认为是存在的

swf_files 通用flash的xss

多个swf加payload 然后计算返回的页面的md5值来判断

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
FileList = []
FileList.append(arg + 'common/swfupload/swfupload.swf'
FileList.append(arg + 'adminsoft/js/swfupload.swf')
FileList.append(arg + 'statics/js/swfupload/swfupload.swf')
FileList.append(arg + 'images/swfupload/swfupload.swf')
FileList.append(arg + 'js/upload/swfupload/swfupload.swf')
FileList.append(arg + 'addons/theme/stv1/_static/js/swfupload/swfupload.swf')
FileList.append(arg + 'admin/kindeditor/plugins/multiimage/images/swfupload.swf')
FileList.append(arg + 'includes/js/upload.swf')
FileList.append(arg + 'js/swfupload/swfupload.swf')
FileList.append(arg + 'Plus/swfupload/swfupload/swfupload.swf'
FileList.append(arg + 'e/incs/fckeditor/editor/plugins/swfupload/js/swfupload.swf')
FileList.append(arg + 'include/lib/js/uploadify/uploadify.swf')
FileList.append(arg + 'lib/swf/swfupload.swf')
md5_list = [
'3a1c6cc728dddc258091a601f28a9c12',
'53fef78841c3fae1ee992ae324a51620',
'4c2fc69dc91c885837ce55d03493a5f5',
]
for payload in FileList:
		payload1 = payload + "?movieName=%22]%29}catch%28e%29{if%28!window.x%29{window.x=1;alert%28%22xss
		req = requests.get(payload1, headers=self.requests.headers)
		if req.status_code == 200:
				md5_value = md5(req.content)

值得学习的思路

url的去重,用的是泛化思想

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def etl(str, onlyNUM=False):
    '''
    传入一个字符串,将里面的字母转化为A,数字转化为N,特殊符号转换为T,其他符号或者字符转化成C
    :param str:
    :param onlyNUM:只换数字
    :return:
    '''
    chars = ""
    for c in str:
        c = c.lower()
        if not onlyNUM:
            if ord('a') <= ord(c) <= ord('z') and not onlyNUM:
                chars += 'A'
            elif ord('0') <= ord(c) <= ord('9'):
                chars += 'N'
            elif c in Chars:
                chars += 'T'
            else:
                chars += 'C'
        else:
            if ord('0') <= ord(c) <= ord('9'):
                chars += 'N'
            else:
                chars += c
    return chars

属性用字典的方法来表示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import copy
import time
import types

class AttribDict(dict):
    """
    This class defines the dictionary with added capability to access members as attributes
    """

    def __init__(self, indict=None, attribute=None):
        if indict is None:
            indict = {}

        # Set any attributes here - before initialisation
        # these remain as normal attributes
        self.attribute = attribute
        dict.__init__(self, indict)
        self.__initialised = True

        # After initialisation, setting attributes
        # is the same as setting an item

    def __getattr__(self, item):
        """
        Maps values to attributes
        Only called if there *is NOT* an attribute with this name
        """

        try:
            return self.__getitem__(item)
        except KeyError:
            raise AttributeError("unable to access item '%s'" % item)

    def __setattr__(self, item, value):
        """
        Maps attributes to values
        Only if we are initialised
        """

        # This test allows attributes to be set in the __init__ method
        if "_AttribDict__initialised" not in self.__dict__:
            return dict.__setattr__(self, item, value)

        # Any normal attributes are handled normally
        elif item in self.__dict__:
            dict.__setattr__(self, item, value)

        else:
            self.__setitem__(item, value)

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, dict):
        self.__dict__ = dict

    def __deepcopy__(self, memo):
        retVal = self.__class__()
        memo[id(self)] = retVal

        for attr in dir(self):
            if not attr.startswith('_'):
                value = getattr(self, attr)
                if not isinstance(value, (types.BuiltinFunctionType, types.FunctionType, types.MethodType)):
                    setattr(retVal, attr, copy.deepcopy(value, memo))

        for key, value in self.items():
            retVal.__setitem__(key, copy.deepcopy(value, memo))

        return retVal

使用方式

1
2
3
4
5
6
7
8
path = AttribDict()
def setPaths(root):
    path.root = root
    path.certs = os.path.join(root, 'certs')
    path.scanners = os.path.join(root, 'scanners')
    path.data = os.path.join(root, "data")
    path.fingprints = os.path.join(root, "fingprints")
    path.output = os.path.join(root, "output")

一些小bug

proxy代理出问题

在python3.9.8中,使用proxy的话会报错,原因是在处理如下情况时候会导致在处理127.0.0.1:8080的时候因为没有协议头,导致出现问题。

1
2
3
4
PROXY_CONFIG = {
    "http": "127.0.0.1:8080",
    "https": "127.0.0.1:8080"
}

/images/w13scan源码解析/Untitled1.png

修复了bug

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# lib/proxy/baseproxy.py 450行
def hanleproxy(self, proxy):
        urlp = urlparse(proxy)
        return urlp.netloc.split(":", 1)

    def proxy_connect(self):
        if not conf["proxy_config_bool"]:
            self._proxy_sock = socket()
        else:
            self._proxy_sock = socks5.socksocket()
            proxy = conf["proxy"]
            if "socks5" in proxy.keys():
                hostname, port = self.hanleproxy(proxy["socks5"])
                self._proxy_sock.set_proxy(socks5.SOCKS5, hostname, int(port))
            elif "socks4" in proxy.keys():
                hostname, port = self.hanleproxy(proxy["socks4"])
                self._proxy_sock.set_proxy(socks5.SOCKS4, hostname, int(port))
            elif "http" in proxy.keys():
                hostname, port = self.hanleproxy(proxy["http"])
                self._proxy_sock.set_proxy(socks5.HTTP, hostname, int(port))
            elif "https" in proxy.keys():
                hostname, port = self.hanleproxy(proxy["https"])
                self._proxy_sock.set_proxy(socks5.HTTP, hostname, int(port))
        self._proxy_sock.settimeout(10)
        self._proxy_sock.connect((self.hostname, int(self.port)))

反序列化参数判断

这里is_base64函数我们看到,第一用了b16,第二这里bytes.decode(errors=‘ignore’)在解析正常的反序列数据的时候是会报错的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def isJavaObjectDeserialization(value):
    if len(value) < 10:
        return False
    if value[0:5].lower() == "ro0ab":
        ret = is_base64(value)
        if not ret:
            return False
        if bytes(ret).startswith(bytes.fromhex("ac ed 00 05")):
            return True
    return False
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def is_base64(value: str):
    """
    成功返回解码后的值,失败返回False
    :param value:
    :return:
    """
    regx = '^[a-zA-Z0-9\+\/=\%]+$'
    if not re.match(regx, value):
        return False
    try:
        ret = base64.b16decode(value).decode(errors='ignore')
    except binascii.Error:
        return False
    return ret

/images/w13scan源码解析/Untitled2.png

修改如下,仅针对java反序列化,其实python反序列化解析也存在问题,fix类似

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def isJavaObjectDeserialization(value,isJava=True):
    if len(value) < 10:
        return False
    if value[0:5].lower() == "ro0ab":
        ret = is_base64(value,isJava)
        if not ret:
            return False
        if ret.startswith(bytes.fromhex("ac ed 00 05")):
            return True
    return False

def is_base64(value: str,isjava=False):
    """
    成功返回解码后的值,失败返回False
    :param value:
    :return:
    """
    value = value + '=' * (4 - len(value) % 4) if len(value) % 4 != 0 else value
    regx = '^[a-zA-Z0-9\+\/=\%]+$'
    if not re.match(regx, value):
        return False
    try:
        if isjava:
            ret = base64.b64decode(value)
        else:
            ret = base64.base64decode(value).decode(errors='ignore')
    except binascii.Error:
        return False
    return ret