【白帽故事】打造一款多线程目录爆破扫描器
VSole2022-03-04 12:35:33
声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由用户承担全部法律及连带责任,文章作者不承担任何法律及连带责任。 |
背景介绍:
今天的分享来自国外一位ID为Mayank Pandey的白帽子,他分享了使用Python多线程功能打造的一款多线程网站目录爆破扫描器。废话不多说,上代码(代码很少,100行不到):
from threading import Threadimport time,requests,sys,os.path def usage(): print("----------USAGE INSTRUCTION ---------") print(f"{sys.argv[0]} URL WORDLIST NUMBER_OF_THREADS(Default is 10)") sys.exit() def prepare(myList,numOfChunks): for i in range(0, len(myList), numOfChunks): yield myList[i:i + numOfChunks] def brute(myList,url): start=time.perf_counter() for lists in myList: threads.append(Thread(target=worker,args=(lists,url),daemon=True)) for thread in threads: try: thread.start() except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() for thread in threads: try: thread.join() except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() finish=time.perf_counter() print(f"\t\t Checked {total_len} Directories in {round(finish-start,2)} Seconds") def worker(lists,url): try: for word in lists: if word.startswith("/"): word=word[1:] url2=url+"/"+word.strip() r=requests.get(url2) if str(r.status_code) in match: print(f"/{word.strip():<40} [ Status: {r.status_code} Length:{len(r.content)} ]") except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() except Exception as e: print(f"An error Occurred : {e}") sys.exit() if __name__ == "__main__": try: match=['200','301','302','401','403','429'] #change this to filter responses try: if sys.argv[1]: url=sys.argv[1] if sys.argv[2]: wordlist=sys.argv[2] try: if sys.argv[3]: numOfThreads=int(sys.argv[3]) except: numOfThreads=10 except: usage() if os.path.isfile(wordlist)==False: print(f"The file {wordlist} doesn't exist") sys.exit() with open(wordlist,'r') as w: myList=w.readlines() total_len=len(myList) final=[] threads=[] if numOfThreads>total_len or numOfThreads<0: print("Too High Value for Threads with Respect to Input Word-list") sys.exit(1) numOfChunks=len(myList)//numOfThreads if url.endswith("/"): url=url[0:-1] print(f''' ====================================== URL --> {url} Word-list --> {wordlist} Threads --> {numOfThreads} Status Codes --> {','.join([w for w in match])} ====================================== ''') print("------- Started Brute forcing Directories -------") myList_new=prepare(myList,numOfChunks) brute(myList_new,url) except Exception as e: print(f"An error Occurred : {e}") sys.exit()
代码详解:
导入和使用详细信息:
from threading import Threadimport time,requests,sys,os.path def usage(): print("----------USAGE INSTRUCTION ---------") print(f"{sys.argv[0]} URL WORDLIST NUMBER_OF_THREADS") sys.exit()
这一段代码主要是导入线程和相关模块,Usage()函数向用户展示了如何使用程序,它在命令行参数不足时会被触发。
分块处理:
def prepare(myList,numOfChunks): for i in range(0, len(myList), numOfChunks): yield myList[i:i + numOfChunks]
该函数是线程数发挥作用的主要地方,利用它对主列表进行分块处理。
多线程逻辑:
def brute(myList,url): start=time.perf_counter() for lists in myList: threads.append(Thread(target=worker,args=(lists,url),daemon=True)) for thread in threads: try: thread.start() except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() for thread in threads: try: thread.join() except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() finish=time.perf_counter() print(f"\t\t Checked {total_len} Directories in {round(finish-start,2)} Seconds") def worker(lists,url): try: for word in lists: if word.startswith("/"): word=word[1:] url2=url+"/"+word.strip() r=requests.get(url2) if str(r.status_code) in match: print(f"/{word.strip():<40} [ Status: {r.status_code} Length:{len(r.content)} ]") except KeyboardInterrupt: print("Received Keyboard Interrupt , Terminating threads") sys.exit() except Exception as e: print(f"An error Occurred : {e}") sys.exit()
上面定义了两个函数brute()和worker(),线程会从brute()函数开始,worker()函数将负责处理请求。
threads.append(Thread(target=worker,args=(lists,url),daemon=True))
从分块列表中提取列表,并使用它启动Thread。
线程将以Worker函数作为目标,这些函数将被附加到包含所有线程的列表中,在此之后,将启动并关联所有线程。
for thread in threads: thread.start() for thread in threads: thread.join()
thread.start():
创建线程实例时,它并不会立即执行,所以需要调用它的start()方法。一旦启动,线程将独立运行,直到目标函数返回。
thread.join():
在调用join()方法时,调用线程会被阻塞,直到终止线程对象(在其上调用线程),线程对象会在以下任何一种情况下终止:
- 正常情况
- 处理一个不当的异常时
- 发生超时
这有助于所有线程完成其工作并正常退出。
在Worker方法关闭(所有线程都关闭)之后,控制将返回到brute()函数,然后继续接下来正常的程序流程。
大概就是这样,使用Python的多线程创建一个简单的扫描工具,但是请记住不要使用太多的线程来运行你的程序,因为一旦处理不当,可能会导致其它程序的崩溃。
当然这个扫描器的代码目前已经放在Github上

VSole
网络安全专家