一个跨平台的端口扫描器,Win10
和 Ubuntu
平台已经过测试,运行没有问题。
不严谨性能测试,默认状态下,扫描全部端口,超时 0.1
秒,500 并发的情况下:
Windows 大概 14 秒扫描完成。 Ubuntu 大概 3 秒钟扫描完成。 参考了此链接 ,我用异步队列重新实现了扫描功能,增加了跨平台功能,封装成了类。
貌似比参考链接中的快了那么一点,嘿嘿嘿。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 import sysfrom socket import socket, AF_INET, SOCK_STREAMimport timefrom asyncio import Queue, TimeoutError, gatherfrom typing import List from async_timeout import timeoutclass ScanPort (object ): def __init__ (self, ip: str = '' , time_out: float = 0.1 , port: List [int ] = None , concurrency: int = 500 ): if not ip: raise ValueError(f'wrong ip! {ip} ' ) self.ip = ip self.port = port self.result: List [int ] = [] self.loop = self.get_event_loop() self.queue = Queue(loop=self.loop) self.timeout = time_out self.concurrency = concurrency @staticmethod def get_event_loop (): """ 判断不同平台使用不同的事件循环实现 :return: """ if sys.platform == 'win32' : from asyncio import ProactorEventLoop return ProactorEventLoop() else : from asyncio import SelectorEventLoop return SelectorEventLoop() async def scan (self ): while True : t1 = time.time() port = await self.queue.get() sock = socket(AF_INET, SOCK_STREAM) try : with timeout(self.timeout): await self.loop.sock_connect(sock, (self.ip, port)) t2 = time.time() if sock: self.result.append(port) print (time.strftime('%Y-%m-%d %H:%M:%S' ), port, 'open' , round (t2 - t1, 2 )) except (TimeoutError, PermissionError, ConnectionRefusedError) as _: sock.close() sock.close() self.queue.task_done() async def start (self ): start = time.time() if self.port: for a in self.port: self.queue.put_nowait(a) else : for a in range (1 , 65535 + 1 ): self.queue.put_nowait(a) task = [self.loop.create_task(self.scan()) for _ in range (self.concurrency)] await self.queue.join() for a in task: a.cancel() await gather(*task, return_exceptions=True ) print (f'扫描所用时间为:{time.time() - start:.2 f} ' ) if __name__ == '__main__' : scan = ScanPort('127.0.0.1' ) scan.loop.run_until_complete(scan.start())
参考链接:
https://www.invincible.fun/2018/05/16/python并发之协程(1)/
https://docs.python.org/zh-cn/3.7/library/asyncio-queue.html#asyncio-queues
本文章首发于个人博客 LLLibra146’s blog
本文作者 :LLLibra146
更多文章请关注公众号 (LLLibra146):
版权声明 :本博客所有文章除特别声明外,均采用 © BY-NC-ND 许可协议。非商用转载请注明出处!严禁商业转载!
本文链接 :https://blog.d77.xyz/archives/13302492.html