Python利用socket实现多进程的端口扫描器
Sir 老王 人气:0作为开发人员经常需要查看服务的端口开启状态判断服务是否宕机。
特别是部署的服务比较多的情况下,可能存在几个甚至几十个服务端口的占用,于是我利用socket不断向服务发送请求的方式来判断端口服务是否已经完成开启。
其中加入多进程的调用方式来提高端口扫描的效率,供大家参考!
首先,我们将需要的python模块全部导入到我们的代码块中,若是没有安装的模块使用pip的当时安装一下即可。
# Importing the socket module. import socket # Importing the datetime module from the datetime package. from datetime import datetime # It's a shortcut for `from multiprocessing import Pool` from multiprocessing.dummy import Pool # It's a shortcut for `from loguru import logger` from loguru import logger
然后,创建一个端口扫描类PortScanner来完成对整个业务逻辑的处理,另外,封装到类中也便于大家参考和修改。
class PortScanner: def __init__(self): """ A constructor. It is called when an object is created from a class and it allows the class to initialize the attributes of a class. """ super(PortScanner, self).__init__() self.remote_ip = None self.ports = [] def scanner(self, port): """ It scans the port. :param port: The port you want to scan """ try: socket_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result_ = socket_.connect_ex((self.remote_ip, port)) if result_ == 0: logger.info('地址:{} 端口:{} 已成功开启!'.format(self.remote_ip, port)) else: logger.info('地址:{} 端口:{} 未开启!'.format(self.remote_ip, port)) except Exception as e: logger.error('端口扫描出现异常!') finally: socket_.close() def start(self): """ It starts the game. """ remote_server = input("输入要扫描的主机地址(127.0.0.1):") if remote_server.strip() == '': remote_server = '127.0.0.1' self.remote_ip = socket.gethostbyname(remote_server) port_range = input("输入要扫描的端口范围(1,50000):") scanner_ports = [] if port_range.strip() == '': port_range = '1,50000' scanner_ports = [n for n in range(int(port_range.split(',')[0]), int(port_range.split(',')[1]))] socket.setdefaulttimeout(0.5) start_ = datetime.now() pool = Pool(processes=10) pool.map(self.scanner, scanner_ports) pool.close() pool.join() end_ = datetime.now() logger.info('所有端口扫描已完成,总共耗时:{}'.format(str(end_ - start_)))
使用python模块中的main函数调用整个端口扫描器执行扫描任务。
# It's a common idiom to determine if the script is being run directly or being imported. if __name__ == '__main__': scanner_ = PortScanner() scanner_.start()
加载全部内容