用Python编写一个高效的端口扫描器

PyPortScanner

python多线程端口扫描器。

输出示例:


Output example

Github

此端口扫描器的源码,文档及详细调用方法见Github PythonPortScanner by Yaokai


背景

有时候,在进行网络相关的研究的时候,我们需要执行一些有目的的参数测量。而端口扫描就是其中比较普遍也比较重要的一项。所谓的端口扫描,就是指通过TCP握手或者别的方式来判别一个给定主机上的某些端口是否处理开放,或者说监听的状态。现有的使用比较广泛的端口扫描工具是nmap。毋庸置疑,nmap是一款非常强大且易于使用的软件。但nmap是一款运行于terminal中的软件,有时在别的代码中调用并不是很方便,甚至没有相应的库。另外,nmap依赖的其他库较多,在较老的系统中可能无法使用较新的nmap,这样会造成扫描的不便。另外,nmap在扫描时需要root权限。基于这个原因,我用python2.7自带的库开发了一款高效的多线程端口扫描器来满足使用需要。

具体实现

I. 利用TCP握手连接扫描一个给定的(ip,port)地址对

为了实现端口扫描,我们首先明白如何使用python socket与给定的(ip, port)进行TCP握手。为了完成TCP握手,我们需要先初始化一个TCP socket。在python中新建一个TCP socket的代码如下:

TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #(1)
TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) #(2)
TCP_sock.settimeout(delay) #(3)

其中(1)是初始化socket的代码,socket.AF_INTE参数表示IPv4 socketsocket.SOCK_STREAM参数表示TCP socket。这样我们就初始化了一个使用IPv4,TCP协议的socket。
(2)使用了socket.setsockopt()来设置socket的另一些参数。socket.SOL_SOCKET指定当前socket将使用setsockopt()中后面的参数。socket.SO_REUSEPORT表明当前socket使用了可复用端口的设置。socket.SO_REUSEPORT具体含义可以参考我的另一篇文章
(3)将socket的连接超时时间设置为delay变量所对应的时间(以秒为单位)。这么做是为了防止我们在一个连接上等待太久。
了解了如何新建一个socket,我们就可以开始对给定的(ip,port)对进行TCP连接。代码如下:

try:
    result = TCP_sock.connect_ex((ip, int(port_number)))
            
    # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE
    if result == 0:
        output[port_number] = 'OPEN'
    else:
        output[port_number] = 'CLOSE'

        TCP_sock.close()
      
except socket.error as e:
    output[port_number] = 'CLOSE'
    pass

因为这是一个I/O操作,为了处理可能出现的异常,我们需要在try,except块处理这部分操作。其次,我们根据socket.connect_ex()方法连接目标地址,通过该方法返回的状态代码来判断连接是否成功。该方法返回0代表连接成功。所以当返回值为0的时候将当前端口记录为打开状态。反之记录为关闭。另外,当连接操作出现异常的时候,我们也将端口记录为关闭状态,因为其并不能被成功连接(可能因为防火墙或者数据包被过滤等原因)。
需要注意的是,在连接完成后我们一定要调用socket.close()方法来关闭与远程端口之间的TCP连接。否则的话我们的扫描操作可能会引起所谓的TCP连接悬挂问题(Hanging TCP connection)。

总结起来,TCP握手扫描的整体代码如下:

"""
Perform status checking for a given port on a given ip address using TCP handshake

Keyword arguments:
ip -- the ip address that is being scanned
port_number -- the port that is going to be checked
delay -- the time in seconds that a TCP socket waits until timeout
output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')
"""
def __TCP_connect(ip, port_number, delay, output):
    # Initilize the TCP socket object
    TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    TCP_sock.settimeout(delay)

    try:
        result = TCP_sock.connect_ex((ip, int(port_number)))
        
        # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE
        if result == 0:
            output[port_number] = 'OPEN'
        else:
            output[port_number] = 'CLOSE'

        TCP_sock.close()

    except socket.error as e:

        output[port_number] = 'CLOSE'
        pass

II. 多线程扫描端口

单线程扫描虽然逻辑简单,但无疑是及其低效的。因为在扫描过程中要进行大量的数据包的发送和接受,所以这是一个I/O密集型的操作。如果只是用单线程进行扫描的话,程序会在等待回复的过程中浪费大量的时间。因此多线程的操作是很有必要的。这里,一个很自然的思路就是为每一个端口单独开一个线程进行扫描。
在这里我们将需要扫描的端口列表定为从Nmap中得到的前1000个使用频率最高的端口:

__port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563,...]

完整的端口表见top 1K commonly used ports
对于一个给定的ip地址,扫描的过程是这样的:

  1. 取出一个端口
  2. 新建一条线程,利用__TCP_connect()函数对该(ip,port)进行连接操作。
  3. 调用thread.start()thread.join()方法,使扫描的子线程开始工作并且命令主线程等待子线程死亡后再结束。
  4. 重复这个过程直到所有的端口都被扫描过。
    根据以上思路,多线程扫描的代码如下:
"""
Open multiple threads to perform port scanning

Keyword arguments:
ip -- the ip address that is being scanned
delay -- the time in seconds that a TCP socket waits until timeout
output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')
"""
def __scan_ports_helper(ip, delay, output):

    '''
    Multithreading port scanning
    '''

    port_index = 0

    while port_index < len(__port_list):

        # Ensure that the number of cocurrently running threads does not exceed the thread limit
        while threading.activeCount() < __thread_limit and port_index < len(__port_list):

            # Start threads
            thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output))
            thread.start()
            # lock the thread until all threads complete
            thread.join()
            port_index = port_index + 1

其中__thread_limit参数是用来限制线程数目的。output是一个字典,以(port: status)的形式保存了扫描的结果。
thread.join()保证了主线程只有在所有子线程都结束之后才会继续执行,从而确保了我们一定会扫描全部的端口。

III. 多线程扫描多个网站

在多线程扫描端口的同时,如果我们能够多线程扫描多个网站,那么扫描的效率还将进一步提高。为了达到这个目的,我们需要另一个线程去管理一个网站对应的对其端口进行扫描的所有子线程。
除此之外,在这种情况下,我们必须删去__scan_ports_helper()中的thread.join()。否则主线程就会被端口扫描子线程阻塞,我们也就无法多线程扫描多个网站了。
在不使用join()的情况下,我们如何确保一个网站的扫描线程只有在完成对其全部端口的扫描之后才会返回呢?这里我使用的方法是检测output字典的长度。因为在全部扫描完成后,output的长度一定与__port_list的长度一致。
改变后的代码如下:

def __scan_ports_helper(ip, delay, output):

    '''
    Multithreading port scanning
    '''

    port_index = 0

    while port_index < len(__port_list):

        # Ensure that the number of cocurrently running threads does not exceed the thread limit
        while threading.activeCount() < __thread_limit and port_index < len(__port_list):

            # Start threads
            thread = threading.Thread(target = __TCP_connect, args = (ip, __port_list[port_index], delay, output))
            thread.start()
            port_index = port_index + 1

    while (len(output) < len(self.target_ports)):
        continue

根据以上扫描线程的代码,端口扫描的管理线程的代码如下所示:

"""
Controller of the __scan_ports_helper() function

Keyword arguments:
ip -- the ip address that is being scanned
delay -- the time in seconds that a TCP socket waits until timeout
message -- the message that is going to be included in the scanning packets, in order to prevent
    ethical problem (default: '')
"""        
 
def __scan_ports(websites, output_ip, delay):

    scan_result = {}

    for website in websites:
        website = str(website)
        scan_result[website] = {}

        thread = threading.Thread(target = __scan_ports_helper, args = (ip, delay, scan_result[website]))
        thread.start()
        # lock the script until all threads complete
        thread.join()

    return scan_result

至此,我们就完成了一个多线程端口扫描器的全部代码。

IV. 总结!利用这些代码扫描给定网站并输出结果

处于输出方便的考虑,我并没有使用多线程扫描多个网站,同时对每个网站多线程扫描多个端口的方法。在这个例子中只进行了多线程扫描端口,但同时只扫描一个网站的操作。整合起来的代码如下:

import sys
import subprocess
import socket
import threading
import time

class PortScanner:

    # default ports to be scanned
    # or put any ports you want to scan here!
    __port_list = [1,3,6,9,13,17,19,20,21,22,23,24,25,30,32,37,42,49,53,70,79,80,81,82,83,84,88,89,99,106,109,110,113,119,125,135,139,143,146,161,163,179,199,211,222,254,255,259,264,280,301,306,311,340,366,389,406,416,425,427,443,444,458,464,481,497,500,512,513,514,524,541,543,544,548,554,563]
    # default thread number limit
    __thread_limit = 1000
    # default connection timeout time inseconds
    __delay = 10


    """
    Constructor of a PortScanner object

    Keyword arguments:
    target_ports -- the list of ports that is going to be scanned (default self.__port_list)
    """
    def __init__(self, target_ports = None):
        # If target ports not given in the arguments, use default ports
        # If target ports is given in the arguments, use given port lists
        if target_ports is None:
            self.target_ports = self.__port_list
        else:
            self.target_ports = target_ports


    """
    Return the usage information for invalid input host name. 
    """
    def __usage(self):
        print('python Port Scanner v0.1')
        print('please make sure the input host name is in the form of "something.com" or "http://something.com!"\n')


    """
    This is the function need to be called to perform port scanning

    Keyword arguments:
    host_name -- the hostname that is going to be scanned
    message -- the message that is going to be included in the scanning packets, in order to prevent
        ethical problem (default: '')
    """
    def scan(self, host_name, message = ''):

        if 'http://' in host_name or 'https://' in host_name:
            host_name = host_name[host_name.find('://') + 3 : ]

        print('*' * 60 + '\n')
        print('start scanning website: ' + str(host_name))

        try:
            server_ip = socket.gethostbyname(str(host_name))
            print('server ip is: ' + str(server_ip))

        except socket.error as e:
            # If the DNS resolution of a website cannot be finished, abort that website.

            #print(e)
            print('hostname %s unknown!!!' % host_name)

            self.__usage()

            return {}

            # May need to return specificed values to the DB in the future

        start_time = time.time()
        output = self.__scan_ports(server_ip, self.__delay, message)
        stop_time = time.time()

        print('host %s scanned in  %f seconds' %(host_name, stop_time - start_time))

        print('finish scanning!\n')

        return output


    """
    Set the maximum number of thread for port scanning

    Keyword argument:
    num -- the maximum number of thread running concurrently (default 1000)
    """
    def set_thread_limit(self, num):
        num = int(num)

        if num <= 0 or num > 50000:

            print('Warning: Invalid thread number limit! Please make sure the thread limit is within the range of (1, 50,000)!')
            print('The scanning process will use default thread limit!')

            return

        self.__thread_limit = num


    """
    Set the time out delay for port scanning in seconds

    Keyword argument:
    delay -- the time in seconds that a TCP socket waits until timeout (default 10)
    """
    def set_delay(self, delay):

        delay = int(delay)
        if delay <= 0 or delay > 100:

            print('Warning: Invalid delay value! Please make sure the input delay is within the range of (1, 100)')
            print('The scanning process will use the default delay time')

            return 

        self.__delay = delay


    """
    Print out the list of ports being scanned
    """
    def show_target_ports(self):
        print ('Current port list is:')
        print (self.target_ports)


    """
    Print out the delay in seconds that a TCP socket waits until timeout
    """
    def show_delay(self):
        print ('Current timeout delay is :%d' %(int(self.__delay)))


    """
    Open multiple threads to perform port scanning

    Keyword arguments:
    ip -- the ip address that is being scanned
    delay -- the time in seconds that a TCP socket waits until timeout
    output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')
    message -- the message that is going to be included in the scanning packets, in order to prevent
        ethical problem (default: '')
    """
    def __scan_ports_helper(self, ip, delay, output, message):

        '''
        Multithreading port scanning
        '''

        port_index = 0

        while port_index < len(self.target_ports):

            # Ensure that the number of cocurrently running threads does not exceed the thread limit
            while threading.activeCount() < self.__thread_limit and port_index < len(self.target_ports):

                # Start threads
                thread = threading.Thread(target = self.__TCP_connect, args = (ip, self.target_ports[port_index], delay, output, message))
                thread.start()
                port_index = port_index + 1


    """
    Controller of the __scan_ports_helper() function

    Keyword arguments:
    ip -- the ip address that is being scanned
    delay -- the time in seconds that a TCP socket waits until timeout
    message -- the message that is going to be included in the scanning packets, in order to prevent
        ethical problem (default: '')
    """        
    def __scan_ports(self, ip, delay, message):

        output = {}

        thread = threading.Thread(target = self.__scan_ports_helper, args = (ip, delay, output, message))
        thread.start()

        # Wait until all port scanning threads finished
        while (len(output) < len(self.target_ports)):
            continue

        # Print openning ports from small to large
        for port in self.target_ports:
            if output[port] == 'OPEN':
                print(str(port) + ': ' + output[port] + '\n')

        return output



    """
    Perform status checking for a given port on a given ip address using TCP handshake

    Keyword arguments:
    ip -- the ip address that is being scanned
    port_number -- the port that is going to be checked
    delay -- the time in seconds that a TCP socket waits until timeout
    output -- a dict() that stores result pairs in {port, status} style (status = 'OPEN' or 'CLOSE')
    message -- the message that is going to be included in the scanning packets, in order to prevent
        ethical problem (default: '')
    """
    def __TCP_connect(self, ip, port_number, delay, output, message):
        # Initilize the TCP socket object
        TCP_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        TCP_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        TCP_sock.settimeout(delay)


        # Initilize a UDP socket to send scanning alert message if there exists an non-empty message
        if message != '':
            UDP_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            UDP_sock.sendto(str(message), (ip, int(port_number)))

        try:
            result = TCP_sock.connect_ex((ip, int(port_number)))
            if message != '':
                TCP_sock.sendall(str(message))
            
            # If the TCP handshake is successful, the port is OPEN. Otherwise it is CLOSE
            if result == 0:
                output[port_number] = 'OPEN'
            else:
                output[port_number] = 'CLOSE'

            TCP_sock.close()

        except socket.error as e:

            output[port_number] = 'CLOSE'
            pass
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • nmap使用指南(终极版) 原创2017-09-09hl0rey信安之路 一、目标指定 1.CIDR标志位 192...
    用电热毯烤猪阅读 11,957评论 1 49
  • 任何一个靠谱的网络攻击都是起步于侦察的。攻击者必须在挑选并确定利用目标中的漏洞之前找到目标在哪里有漏洞。编写一个扫...
    ltoddy阅读 1,416评论 0 7
  • iPhone的标准推荐是CFNetwork 库编程,其封装好的开源库是 cocoa AsyncSocket库,用它...
    Ethan_Struggle阅读 2,218评论 2 12
  • Nmap扫描原理与用法 1Nmap介绍 Nmap扫描原理与用法PDF:下载地址 Nmap是一款开源免费的网络发现(...
    y0ungta1a阅读 5,364评论 0 50
  • 2012年10月15日 今天的我,总结过去,才懂得十年前的我在人生的十字路口应该怎样选择怎么做才是正确的,是不是...
    陌上花开ing阅读 199评论 0 0