Python Scapy小工具


title: Python Scapy小工具
date: 2020-04-08 16:10:51
tags:

  • Python
  • tools
  • scapy
    categories: 工具

toc: true

Scapy是一个可以让用户发送、侦听和解析并伪装网络报文的Python程序。这些功能可以用于制作侦测、扫描和攻击网络的工具。

换言之,Scapy 是一个强大的操纵报文的交互程序。它可以伪造或者解析多种协议的报文,还具有发送、捕获、匹配请求和响应这些报文以及更多的功能。Scapy 可以轻松地做到像扫描(scanning)、路由跟踪(tracerouting)、探测(probing)、单元测试(unit tests)、攻击(attacks)和发现网络(network discorvery)这样的传统任务。它可以代替hping,arpspoof,arp-sk,arping,p0f 甚至是部分的Namp,tcpdumptshark 的功能。

Scapy

参考:

Scapy 主要做两件事:发送报文和接收回应。您定义一系列的报文,它发送这些报文,收到回应,将收到的回应和请求匹配,返回一个存放着(request, answer)即(请求, 回应)的报文对(packet couples)的列表(list)和一个没有匹配的报文的列表(list)。这样对于像Nmaphping 这样的工具有一个巨大的优势:回应没有被减少 (open/closed/filtered)而是完整的报文。

导入scapy方式:

from scapy.all import *

常用包解析

SR包解析

sr(IP(dst='192.168.1.0/24')/TCP(dport=(1,65535)), timeout=2)

发送三层数据包,等待接收一个或多个数据包的响应(注意:当依次向每个IP的65535个端口发送完才算执行完这个函数,而不是调用一次只发一个包,以下所有发包方式都与之一样)

  • 发送ICMP数据包,查看接受到的数据
>>> reply_packet = sr(IP(dst='114.114.114.114')/ICMP(), timeout=2)
>>> print(type(reply_packet))
<class 'tuple'>
>>> print(reply_packet)
(<Results: TCP:0 UDP:0 ICMP:1 Other:0>, <Unanswered: TCP:0 UDP:0 ICMP:0 Other:0>)

解析:返回的数据包为tuple格式,也即是元组。元组内第一个元素为接受到的响应包,第二个元素为没有接受到的响应包。

  • 查看响应包的内容
>>> print(reply_packet[0].res)
[(<IP  frag=0 proto=icmp dst=114.114.114.114 |<ICMP  |>>, <IP  version=4 ihl=5 tos=0x0 len=28 id=57023 flags= frag=0 ttl=76 proto=icmp chksum=0x9083 src=114.114.114.114 dst=192.168.90.17 |<ICMP  type=echo-reply code=0 chksum=0x1600 id=0x0 seq=0x0 |<Padding  load='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' |>>>)]
>>> print(reply_packet[0].res[0][1].fields)
{'options': [], 'version': 4, 'ihl': 5, 'tos': 0, 'len': 28, 'id': 57023, 'flags': <Flag 0 ()>, 'frag': 0, 'ttl': 76, 'proto': 1, 'chksum': 36995, 'src': '114.114.114.114', 'dst': '192.168.90.17'}

解析:得到的响应包是个列表,第一个元素为发送的数据包,第二个元素为返回的数据包。使用xx.res[0][1].fields(返回字典)查看返回包的具体内容;同时也可以直接用show(返回字符串)方法查看包体内容。其实reply_packet[0].res[0][1].fields == reply_packet[0].res[0][1][0].fields,res的第三位[0]表示精确到第几层,这里[0]代表IP层,[1]进一层到传输层,[2]进一步到应用层。

SR1包解析

reply_packet = sr1(IP(dst='114.114.114.114')/ICMP(), timeout=2)

发送三层数据包,并仅仅只等待接收一个数据包的响应

  • 发送ICMP数据包,查看接受到的数据
>>> reply_packet = sr1(IP(dst='114.114.114.114')/ICMP(), timeout=2)
>>> print(reply_packet)
b'E\x00\x00\x1cu\xb7\x00\x00K\x01\xfa\x8brrrr\xc0\xa8Z\x11\x00\x00\x16\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
>>> print(reply_packet.fields)
{'options': [], 'version': 4, 'ihl': 5, 'tos': 0, 'len': 28, 'id': 30135, 'flags': <Flag 0 ()>, 'frag': 0, 'ttl': 75, 'proto': 1, 'chksum': 64139, 'src': '114.114.114.114', 'dst': '192.168.90.17'}

解析:sr1返回的对象没有太多复杂东西,直接相当于sr返回对象的reply_packet[0].res[0][1],可以理解为sr1返回的对象是sr返回对象的一个子集。所以可以和sr一样直接用fields来读取其内容。

SRP包解析

reply_packet = srp(Ether(dst='00ff:ff:ff:ff:ff:ff')/IP(dst='114.114.114.114')/ICMP())

发送二层数据包,并且等待回应(这个函数可以编辑二层头部,sr()不能编辑二层头部)

  • 发送ICMP数据包,查看接受到的数据
>>> reply_packet = srp(Ether(dst='00ff:ff:ff:ff:ff:ff')/IP(dst='114.114.114.114')/ICMP())
>>> print(reply_packet[0].res[0][1].fields)
{'dst': '38:f9:d3:bd:38:f6', 'src': 'e8:68:19:42:1a:7d', 'type': 2048}

解析:这里默认只给出了二层头部,可以通过调节res后第三位的值来选择性查看每层信息,res的第三位[0]表示精确到第几层,[0]代表二层,[1]进一层到IP层,[2]进一步到传输层,[3]进一步到应用层

存活主机及端口扫描

主要是为了探测内网存活主机以及端口扫描。使用了线程池,就是觉得简单,没考虑其他情况。。。

  • -i 指定目标地址,192.168.90.2或者192.168.90.0/24
  • -t 同时指定线程池大小, --ta 指定存活主机扫描线程池,--tp指定端口扫描线程池
  • -p 80,81,88-90,90 指定扫描的端口,不指定则使用默认。
# -*- coding: utf-8 -*-
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# FileName  :  surhpscan.py
# Time      :  2020/04/15 15:13:28
# Author    :  OuDeNiu
# Email     :  havebutno@gmail.com
# Version   :  1.0
# Descript  :  主要用于内网主机探测(首先探测存活主机,然后对存活主机端口扫描)
#                   冗余部分为测试代码,,,
    # 以下修改为了美化
    # scapy模块sendrecv.py 的 SndRcvHandler类   verbose参数(文件的88行),修改为0
        # if verbose is None:
        #     verbose = 0
        #     # verbose = conf.verb
    #

import socket
import IPy
from colorama import Fore, init
from concurrent.futures import ThreadPoolExecutor
from optparse import OptionParser
from scapy.all import *
import datetime
import sys
import os


# 自动重置颜色
init(autoreset=True)


class Nscan():
    def __init__(self, hosts, threads=None, ta=None, tp=None, inp_port=None):
        # 默认扫描的端口好
        self.default_port = [20,21,22,23,25,80,81,82,83,84,85,86,87,88,89,90,95,107,115,135,139,161,174,389,443,445,444,465,631,636,512,513,751,1080,1081,1082,1083,1084,1085,1086,1087,1088,1089,1090,1433,1434,1521,1527,1534,1649,1985,1997,1999,3306,3389,5000,5357,5432,5698,6379,7001,7002,8000,8001,8002,8003,8004,8005,8006,8007,8008,8009,8010,8011,8012,8013,8080,8081,8082,8083,8084,8090,8091,8092,8093,9092,27017,11211]
        # 存放最终需要扫描的IP
        self.ips = []
        # 如果输入端口,则存放最终扫描的端口,如果不存放则使用默认的端口号
        self.port_scan = []
        # 接收参数
        self.hosts = hosts
        self.inp_port = inp_port
        self.threads = threads
        self.ta = ta
        self.tp = tp
        # 解析IP、线程、端口
        self.recv_ip()
        self.recv_thread()
        self.recv_port()
        # 最终存活主机的IP地址
        self.alive_ip = []
        # 端口扫描的结果 存放格式为:    [ ['IP',[port,port,port] , ['IP',[port,port,port]  ]
        self.result_port_scan = []
        
    def recv_thread(self):
        # 处理传入的线程池大小  最后形成self.ta(存活主机探测线程池)   self.tp(端口扫描线程池)
        if self.threads is None:
            if self.ta is None:
                self.ta = 10
                if self.tp is None:
                    self.tp = 5
                else: 
                    self.tp = self.tp
            else:
                self.ta = int(self.ta)
                if self.tp is None:
                    self.tp = 5
                else:
                    self.tp = int(self.tp)
        else:
            self.ta = self.tp = int(self.threads)

    # 解析IP并加入到需要探测的主机列表中
    def recv_ip(self):
        inp_ips = self.hosts.split(',')
        # 设置make_net 为1 时,可以自定义
        for ips in inp_ips:
            ips = IPy.IP(ips, make_net=1)
            for ip in ips:
                self.ips.append(str(ip))
        print(Fore.YELLOW + '[*]:已将您输入的IP解析完毕。正在启动主机存活探测,请稍后。。。')

    # 解析端口
    def recv_port(self):
        # 如果不指定端口则默认端口,否则对输入的参数进行转化
        if inp_port is None:
            self.port_scan = self.default_port
        else:
            # 以逗号分割输入的端口字符串
            f_port = self.inp_port.split(',')
            # 循环分割后的字符串,判断是否存在  '-' 号(为了可以实现  -p 81,82,8080-8090的方式)
            for i in f_port:
                if '-' in i:
                    s_port = i.split('-')
                    for port in range(int(s_port[0]),int(s_port[:-1])+1):
                        self.port_scan.append(port)
                else:
                    self.port_scan.append(int(i))
            print(Fore.YELLOW + '端口处理完毕,已指派您指定的端口。')


    # 判断主机是否存活
    def check_host(self, cip):
        try:
            # # 利用ARP进行扫描,默认将扫描结果加入到存活主机列表中  self.alive_ip
            # a,b = arping(cip)
            # if a.res:
            #     print('++++++++YES++++++++')
            #     self.alive_ip.append(cip)
            # else:
            #     print('主机不存活')

            # 利用ICMP发包进行存活主机探测,默认只发一个包(可设置retry参数指定次数,但是受timeout参数影响,在不超时时间内重试),  timeout影响准确性,建议为大于1
            a = sr1(IP(dst=cip)/ICMP()/"ceshi",timeout=1)
            # 修改了sendrecv.py 的 SndRcvHandler类   verbose参数(文件的88行),修改为0(非0时输出发包过程。这里为了自定义输出过程,美化程序。。。。。)

            # 判断响应包中ICMP是否为空,为空不存活,不为空输出 该IP存活,并将该IP加入到存活主机列表
            if a.getlayer(ICMP):
                # 输出存活信息 参考官网文档
                print(Fore.GREEN + a.sprintf("[*] %IP.src% is alive"))
                self.alive_ip.append(cip)
                return a.sprintf("[*] %IP.src% is alive")
            else:return None
        # 设置屏蔽错误信息
        except Exception as e:
            pass
            # print(e)
            # print('主机不存活')

    # # 判断主机存活的线程池
    # def thread_check_alive(self):
    #     pool = ThreadPoolExecutor(self.ta)
    #     for ip in self.ips:
    #         pool.submit(self.check_host, ip)
    #     pool.shutdown()

    # # scapy模块 sr()函数 TCP端口扫描(经过测试,在不开启多线程情况下,扫描准确性与timeout 有关,一般设置2s; 如果开启多线程,则扫描结果非常不精确。[未加锁情况下])
    # def check_port(self,cip):
    #     try:
    #         res,unans = sr(IP(dst=cip)/TCP(flags="S", dport=(self.default_port)),timeout=2)
    #         # 修改了sendrecv.py 的 SndRcvHandler类   verbose参数(文件的88行),修改为0(非0时输出发包过程。这里为了自定义输出过程,美化程序。。。。。)

    #         if res.res:
    #             port_reslut = []
    #             # 循环接受的数据包,读取其中头部flags信息,如果为SA则端口存活。
    #             for i in range(len(res.res)):
    #                 flags = res.res[i][1].getlayer(TCP).fields
    #                 if flags['flags'] == 'SA':
    #                     port_reslut.append(flags['sport'])
    #                     # RA 代表不存活 SA   存活
    #             # 经过上述循环判断,得到的存活端口号列表 与主机IP形成新的列表,存放在最终的扫描结果中。 如果端口扫描结果为空,则不计入最终结果
    #             if port_reslut:
    #                 cip_result = [cip,port_reslut]
    #                 self.result_port_scan.append(cip_result)
    #             else:pass
    #     except Exception as e:
    #         print(e)

    # socket连接,不返回banner信息
    def socket_port_scan(self,cip):
        try:
            port_reslut = []
            # 循环端口进行扫描
            for port in self.port_scan:
                s = socket.socket()
                s.settimeout(0.5)
                # print('[*]:'+cip+ ':' +str(port))
                if s.connect_ex((cip, port)) == 0:
                    # print('[+] : ' + str(port))
                    port_reslut.append(port) 
                    s.close()  
            # 经过上述循环判断,得到的存活端口号列表 与主机IP形成新的列表,存放在最终的扫描结果中。 如果端口扫描结果为空,则不计入最终结果
            if port_reslut:
                cip_result = [cip,port_reslut]
                self.result_port_scan.append(cip_result)
                print(Fore.GREEN + '[*]:{0} 开放端口扫描完毕'.format(cip))
                return {cip:port_reslut}
            else:return None
        # 屏蔽错误信息
        except Exception as e:
            pass
            # print(e)

    # #socket端口扫描,返回banner信息, 
    # # 但是由于测试中收到的信息无法解码,因此不调用该模块,先放在这里,不作处理(以前时可以的。。。。。。不知道为什么这里不行)
    # def socket_port_banner(self,cip):
    #     try:
    #         for port in self.default_port:
    #             s = socket.socket()
    #             s.settimeout(0.1)
    #             s.connect((host, port))
    #             s.sendall(bytes(r'aaa\r\n\r\n', encoding='utf-8'))
    #             banner = s.recv(1024)
    #             if banner:
    #                 print(Fore.GREEN + '[+] : ' + str(port) + '\t' + banner.decode('utf-8'))
    #             else:
    #                 print(Fore.GREEN + '[+] : ' + str(port) + '\t' + 'Unknow')
    #             s.close()
    #     except:
    #         pass
    

    # # 对于存活的主机进行端口扫描的线程池
    # def thread_port_scan(self):
    #     p = ThreadPoolExecutor(self.tp)
    #     for ip in self.alive_ip:
    #         p.submit(self.socket_port_scan,ip)
    #     p.shutdown()

# 写入日志文件
def log(l):
    if l.result():
        with open(os.path.split(os.path.abspath(sys.argv[0]))[0]+'/surhpscan.txt','a') as f:
            f.write(str(l.result()) + '\n')

# 线程池,接受函数,函数参数,线程池大小
def thread_pool(fn,pool,ips):
    p = ThreadPoolExecutor(pool)
    for ip in ips:
        p.submit(fn,ip).add_done_callback(log)
    p.shutdown()


def main():
    usage = Fore.CYAN + "%prog -i 192.168.10.0/24"
    parser = OptionParser(usage)
    parser.add_option('-i', '--ip', dest='ip', help="设置扫描的IP地址")
    parser.add_option('-t', '--threads', dest='threads',
                      help="同时指定,默认为None.Mac建议设置20左右,Win都可以。", default=None)
    parser.add_option('-p', '--port', dest='port',
                      help='自定义端口号:80,81,88-90,90', default=None)
    parser.add_option('--ta',dest='thread_alive',help='探测存活主机时的线程池(默认10)',default=10)
    parser.add_option('--tp', dest='thread_port_scan', help='端口探测时线程池(默认5)',default=5)
    # parser.add_option()
    (options, args) = parser.parse_args()
    if options.ip:
        return options.ip, options.threads, options.port, options.thread_alive, options.thread_port_scan
    else:
        exit(parser.error(Fore.RED + '参数输入有误,请重试。'))


if __name__ == "__main__":
    banner = '''
                    .__                                       
  ________ _________|  |__ ______  ______ ____ _____    ____  
 /  ___/  |  \_  __ \  |  \\\\____ \/  ___// ___\\\__  \  /    \ 
 \___ \|  |  /|  | \/   Y  \  |_> >___ \\\\  \___ / __ \|   |  \\
/____  >____/ |__|  |___|  /   __/____  >\___  >____  /___|  /
     \/                  \/|__|       \/     \/     \/     \/ 
                                               by zhangsan V1.0
                                                 2019-12-30
    '''
    start_time = datetime.datetime.now()
    print(Fore.YELLOW + banner)
    # 获取参数
    inp_ip, inp_threads, inp_port, inp_thread_alive, inp_thread_port_scan = main()
    # 如果同时指定线程池,初始化时候直接传入
    # hosts, threads=None, ta=None, tp=None, inp_port=None
    nscan = Nscan(inp_ip,inp_threads,inp_thread_alive,inp_thread_port_scan,inp_port)
    # nscan.thread_check_alive()
    # nscan.thread_port_scan()
    # print(nscan.alive_ip)
    # print(len(nscan.alive_ip))
    # print('+++++++++++++++++++++++')
    # print(len(nscan.result_port_scan))
    thread_pool(nscan.check_host,nscan.ta,nscan.ips)
    print(Fore.YELLOW + '[+]:存活主机扫描完毕。共发现存活主机{0}台'.format(len(nscan.alive_ip)))
    print(Fore.CYAN + '[*]---------Port Scaning--------[*]')
    thread_pool(nscan.socket_port_scan,nscan.tp,nscan.alive_ip)
    # 输出存活主机端口结果
    for i in nscan.result_port_scan:
        print(i)
    print(Fore.YELLOW + '[+]:存活主机的端口已扫描完毕。有结果的共{0}条。'.format(len(nscan.result_port_scan)))
    end_time = datetime.datetime.now()
    print(Fore.CYAN + '[Time] : 程序执行时间 {0} 秒'.format((end_time - start_time).seconds))

ARP扫描

简单使用了ARP模块,没办法,毕竟是最快的。

  • python3 cscan.py 192.168.90.1
  • python3 cscan.py 192.168.90.0.24
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# FileName  :  cscan.py
# Time      :  2020/04/10 15:43:08
# Author    :  OuDeNiu
# Email     :  havebutno@gmail.com
# Version   :  1.0
# Descript  :  测试ARP

from scapy.all import *
from colorama import Fore, init
import sys
import logging
import datetime

logging.basicConfig(level = logging.NOTSET,format = '%(message)s')

# 自动重置颜色
init(autoreset=True)

def check_host_arp(cip):
    a,b = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=cip),timeout=5)
    # print(a.res)
    # print(a.res)
    # print(a.res[0][1].getlayer(ARP).fields)
    for ip in a.res:
        # print(ip[1].getlayer(ARP).fields)
        # mac = ip[1].getlayer(ARP).fields['hwsrc']
        # result_ip = ip[1].getlayer(ARP).fields['psrc']
        result_ip = ip[1].getlayer(ARP).fields['psrc']
        logging.debug(Fore.GREEN + str(result_ip) + " is alive.")

if __name__ == "__main__":
    start_time = datetime.datetime.now()
    hosts = sys.argv[1]
    logging.debug(Fore.CYAN + "开始扫描 " + hosts)
    check_host_arp(hosts)
    end_time = datetime.datetime.now()
    print(Fore.CYAN + '[Time] : 程序执行时间 {0} 秒'.format((end_time - start_time).seconds))

原本是想加上弱口令扫描的,因为python2 的socket返回的是字符串,python3的socket是字节,部分字节无法解码就出了问题。。。没有解决所以。。。

所有原创文章采用 知识共享署名-非商业性使用 4.0 国际许可协议 进行许可。
您可以自由的转载和修改,但请务必注明文章来源并且不可用于商业目的。
本站部分内容收集于互联网,如果有侵权内容、不妥之处,请联系我们删除。敬请谅解!

  Previous post MySQL小结
Next post   sqlmap关于MSSQL执行命令思考

评论已关闭

无论有多困难,都坚强地抬头挺胸,告诉所有人,你并非他们想象的那样不堪一击。

每个人心里都有一段伤痕,时间才是最好的疗剂。

人总是珍惜未得到的,而遗忘了所拥有的。

退一步,并不象征我认输;放手,并不表示我放弃;微笑,并不意味我快乐!

人海中再回首,朋友真诚依旧,生命里重逢,心境平和温柔,往事如风,岁月如歌,漫漫人生路,苍桑几许,幸福几何!