1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| import sys from socket import socket, AF_INET, SOCK_STREAM import time from asyncio import Queue, TimeoutError, gather from typing import List
from async_timeout import timeout
class ScanPort(object): def __init__(self, ip: str = '', time_out: float = 0.1, port: List[int] = None, concurrency: int = 500): if not ip: raise ValueError(f'wrong ip! {ip}') self.ip = ip self.port = port self.result: List[int] = [] self.loop = self.get_event_loop() self.queue = Queue(loop=self.loop) self.timeout = time_out self.concurrency = concurrency
@staticmethod def get_event_loop(): """ 判断不同平台使用不同的事件循环实现
:return: """ if sys.platform == 'win32': from asyncio import ProactorEventLoop return ProactorEventLoop() else: from asyncio import SelectorEventLoop return SelectorEventLoop()
async def scan(self): while True: t1 = time.time() port = await self.queue.get() sock = socket(AF_INET, SOCK_STREAM) try: with timeout(self.timeout): await self.loop.sock_connect(sock, (self.ip, port)) t2 = time.time() if sock: self.result.append(port) print(time.strftime('%Y-%m-%d %H:%M:%S'), port, 'open', round(t2 - t1, 2)) except (TimeoutError, PermissionError, ConnectionRefusedError) as _: sock.close() sock.close() self.queue.task_done()
async def start(self): start = time.time() if self.port: for a in self.port: self.queue.put_nowait(a) else: for a in range(1, 65535 + 1): self.queue.put_nowait(a) task = [self.loop.create_task(self.scan()) for _ in range(self.concurrency)] await self.queue.join() for a in task: a.cancel() await gather(*task, return_exceptions=True) print(f'扫描所用时间为:{time.time() - start:.2f}')
if __name__ == '__main__': scan = ScanPort('127.0.0.1') scan.loop.run_until_complete(scan.start())
|