亲宝软件园·资讯

展开

Python利用socket实现多进程的端口扫描器

Sir 老王 人气:0

作为开发人员经常需要查看服务的端口开启状态判断服务是否宕机。

特别是部署的服务比较多的情况下,可能存在几个甚至几十个服务端口的占用,于是我利用socket不断向服务发送请求的方式来判断端口服务是否已经完成开启。

其中加入多进程的调用方式来提高端口扫描的效率,供大家参考!

首先,我们将需要的python模块全部导入到我们的代码块中,若是没有安装的模块使用pip的当时安装一下即可。

# Importing the socket module.
import socket

# Importing the datetime module from the datetime package.
from datetime import datetime

# It's a shortcut for `from multiprocessing import Pool`
from multiprocessing.dummy import Pool

# It's a shortcut for `from loguru import logger`
from loguru import logger

然后,创建一个端口扫描类PortScanner来完成对整个业务逻辑的处理,另外,封装到类中也便于大家参考和修改。

class PortScanner:
    def __init__(self):
        """
        A constructor. It is called when an object is created from a class and it allows the class to initialize the
        attributes of a class.
        """
        super(PortScanner, self).__init__()
        self.remote_ip = None
        self.ports = []

    def scanner(self, port):
        """
        It scans the port.

        :param port: The port you want to scan
        """
        try:
            socket_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            result_ = socket_.connect_ex((self.remote_ip, port))
            if result_ == 0:
                logger.info('地址:{} 端口:{} 已成功开启!'.format(self.remote_ip, port))
            else:
                logger.info('地址:{} 端口:{} 未开启!'.format(self.remote_ip, port))
        except Exception as e:
            logger.error('端口扫描出现异常!')
        finally:
            socket_.close()

    def start(self):
        """
        It starts the game.
        """
        remote_server = input("输入要扫描的主机地址(127.0.0.1):")
        if remote_server.strip() == '':
            remote_server = '127.0.0.1'
        self.remote_ip = socket.gethostbyname(remote_server)
        port_range = input("输入要扫描的端口范围(1,50000):")
        scanner_ports = []
        if port_range.strip() == '':
            port_range = '1,50000'
        scanner_ports = [n for n in range(int(port_range.split(',')[0]), int(port_range.split(',')[1]))]
        socket.setdefaulttimeout(0.5)
        start_ = datetime.now()
        pool = Pool(processes=10)
        pool.map(self.scanner, scanner_ports)
        pool.close()
        pool.join()
        end_ = datetime.now()
        logger.info('所有端口扫描已完成,总共耗时:{}'.format(str(end_ - start_)))

使用python模块中的main函数调用整个端口扫描器执行扫描任务。

# It's a common idiom to determine if the script is being run directly or being imported.
if __name__ == '__main__':
    scanner_ = PortScanner()
    scanner_.start()

加载全部内容

相关教程
猜你喜欢
用户评论