跳过正文
  1. 文章/

《Python黑帽子》python3代码实现(第三章)

·563 字·3 分钟
Rain
作者
Rain
一个安全行业的小透明
《Python黑帽子》 - 这篇文章属于一个选集。
§ : 本文

第三章 网络:原始套接字和流量嗅探
#

Windows上和Linux上的包嗅探
#

在示例中,我们首先导入socket与os模块,根据os.name判断主机操作系统,该变量属性当前只注册了三个值,具体如下:

posix nt java
Linux Windows Java虚拟机
根据操作系统创建原始套接字,这里因为Linux系统只能嗅探到ICMP的数据包,所以这里创建的是基于ICMP包的原始套接字IPPROTO_ICMP,但windows可以嗅探到所有IP数据包IPPROTO_IP,所以这里需要使用os.name对主机操作系统进行判断,然后使用原始套接字构造IP头socket(socket.AF_INET,socket.SOCK_RAW, socket.IPPROTO_IP),使用bind()方法对网卡进行监听,这里ICMP包是不具备端口号的,所以端口号为0,然后使用setsockopt()方法设置在捕获的数据包中包含IP头,如果操作系统为windows的话,这里需要使用ioctl()方法开启网卡混杂模式,然后使用recvfrom()方法读取单个数据库,用if语句对操作系统再次判断,如为windows则关闭网卡混杂模式。
import socket
import os

# 监听的网卡  0.0.0.0表示所有网卡
host = "172.16.1.7"

# 创建一个原始套接字(RAW Socket),然后绑定到公开接口上
if os.name == "nt":     # Mac的os.name==posix
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP
# Windows和Linux的区别是Windows允许我们嗅探所有协议的所有数据包,但Linux只能嗅探到ICMP数据。
# 使用原始套接字用于构造IP头
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))    # 对网卡进行监听,由于ICMP包是不具备端口号的,所以端口填什么都可以,你可以try一try

# 设置在捕获的数据包中包含IP头
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

# 在Windows平台上,需要设置IOCTL以启动混杂模式,以允许我们嗅探网卡上经过的所有数据包(即使数据的目的地址不是本机)
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

# 读取单个数据包
print(sniffer.recvfrom(65535))

# 在Windows平台上关闭混杂模式
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

下面是执行代码得到的结果:

在这里插入图片描述

解码IP层
#

在理解代码之前,我们首先得要理解IP头结构:

在这里插入图片描述
详情可以参考https://www.cnblogs.com/jacklikedogs/articles/3848263.html 理解完IP头结构之后,我们先直接看看代码:

import socket
import os
import struct
from ctypes import *
# 监听的主机IP
host = "172.16.1.7"


# IP头定义
class IP(Structure):
    _fields_ = [
        ('ihl', c_ubyte, 4),            # 头长度ip header length
        ('version', c_ubyte, 4),        # 版本
        ('tos', c_ubyte),               # 服务类型
        ('len', c_ushort),              # ip数据包总长度
        ('id', c_ushort),               # 标识符
        ('offset', c_ushort),           # 片偏移
        ('ttl', c_ubyte),               # 生存时间
        ('protocol_num', c_ubyte),      # 协议数字(协议类型),数字代表协议,下面有代码甚至映射表
        ('sum', c_ushort),              # 头部校验和
        ('src', c_ulong),               # 源IP   linux系统下需要将类型改为c_uint32
        ('dst', c_ulong)                # 目的IP  同上
    ]

    def __new__(cls, socket_buffer=None):           # new()方法是在类准备将自身实例化时调用,将原始缓冲区中的数据填充到结构中
        return cls.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        # 协议字段与协议名称对应
        self.protocol_map = {1: 'ICMP', 6: 'TCP', 17: 'UDP'}

        # 可读性更强的IP地址
        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))        # struct.pack()将数据解码为"<"小端,"L"无符号长型
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))        # inet_ntoa() 将32bit数值转换为IP地址

        # 匹配协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)


if os.name == 'nt':
    socket_protocol = 0
else:
    socket_protocol = 1

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == 'nt':
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

try:
    while True:
        # 读取数据包
        raw_buffer = sniffer.recvfrom(65565)[0]
        # print(raw_buffer[0:20])
        # 将缓冲区的前20个字节按IP头进行解析
        ip_header = IP(raw_buffer[0:20])
        # 输出协议和通信双方的IP地址
        print("protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address))
# 处理ctrl+c
except KeyboardInterrupt:
    if os.name == 'nt':
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

示例中添加了对数据包解码,首先定义IP类,使用__new__()方法,当自身实例化时将原始缓冲区中的数据填充到我们刚刚定义的C结构体中,然后使用__init__()方法使数据可视化,最后使用前面示例用到的代码抓取数据包,得到数据包后将前20字节的数据导入结构体中进行初始化,然后输出数据包的信息。 这里__new__()方法和__init__()方法的区别这篇文章就写的非常好: https://blog.csdn.net/qq_36624456/article/details/98240234 下面是执行代码得到的结果:

在这里插入图片描述

解码ICMP
#

在理解代码之前,我们还是先看看ICMP包的结构: 这里因为各种状态的报文结构不一样,就把目的不可达报文单独拎出来,详细可以看下面这篇文章: https://www.cnblogs.com/scrat/archive/2012/08/02/2620163.html

在这里插入图片描述
接下来贴上代码:

import threading
import socket
import os
import struct
from ctypes import *
import time
from netaddr import IPNetwork, IPAddress

host = "172.16.1.7"
# 目标网段
subnet = "172.16.1.0/24"
# 自定义字段,用于辨别收到的包是否是响应我们的UDP请求
magic_message = b'PYTHONRULES!'


class IP(Structure):

    _fields_ = [
        ('ihl', c_ubyte, 4),
        ('version', c_ubyte, 4),
        ('tos', c_ubyte),
        ('len', c_ushort),
        ('id', c_ushort),
        ('offset', c_ushort),
        ('ttl', c_ubyte),
        ('protocol_num', c_ubyte),
        ('sum', c_ushort),
        ('src', c_ulong),
        ('dst', c_ulong)
    ]

    def __new__(cls, socket_buffer=None):
        return cls.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):

        self.protocol_map = {1: 'ICMP', 6: 'TCP', 17: 'UDP'}

        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))

        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)


# ICMP包头定义
class ICMP(Structure):

    _fields_ = [
        ("type", c_ubyte),          # 类型
        ("code", c_ubyte),          # 代码值
        ("checksum", c_ushort),     # 头部校验和
        ("unused", c_ushort),       # 未使用
        ("next_hop_mtu", c_ushort)  # 下一跳的MTU
    ]

    def __new__(cls, socket_buffer=None):
        return cls.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        pass


# 批量发送UDP请求包
def udp_sender(subnet, magic_message):
    time.sleep(2)
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    for ip in IPNetwork(subnet):
        try:
            # 设置端口为大于1023的端口号,尽量使用不常用端口
            sender.sendto(magic_message, (str(ip), 65212))
        except:
            pass


if os.name == 'nt':
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

if os.name == 'nt':
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

# 启用多线程发送UDP请求包
t = threading.Thread(target=udp_sender, args=(subnet, magic_message, ))
t.start()

try:
    while True:
        raw_buffer = sniffer.recvfrom(65535)[0]
        ip_header = IP(raw_buffer[0:20])
        # print("protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address))
        # 如果协议为ICMP,则进行下一步处理
        if ip_header.protocol == 'ICMP':
            # 计算真实IP头长度 --> 计算公式:ihl(4位二进制换算十进制) * 4 = ip头长度(字节)
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]
            # 结构ICMP头数据
            icmp_header = ICMP(buf)
            # print("ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code))
            # 收到检查类型为3,代码为3的ICMP包则说明目标主机存在
            if icmp_header.type == 3 and icmp_header.code == 3:
                # 确认响应的主机在我们的目标子网内
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):
                    # 确认ICMP数据中包含我们发送的自定义字符串
                    if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message:
                        print("Host Up: %s" % ip_header.src_address)
except KeyboardInterrupt:
    if os.name == 'nt':
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

示例中大部分代码跟之前相同,这里增加了netaddr模块,定义了udp_sender()函数对某个定义的网段发送udp数据包。然后在主函数内开启多线程,再把接受到的所有数据包进行筛选过滤,最后就可以得到存活主机的名单。 下面就是程序运行的结果:

在这里插入图片描述

【本章完结】
#

第二章 网络基础 传送门:https://blog.csdn.net/qq_40549070/article/details/108193537

《Python黑帽子》 - 这篇文章属于一个选集。
§ : 本文