第一篇:NAT 基础原理与 SNAT 实现
运行前提条件
- 系统:Linux 系统(Ubuntu 20.04+ / Debian 11+ / CentOS 8+ 均可),Windows 用户用 WSL2 也可以
- 权限:必须有 sudo /root 权限(操作内核网络模块需要)
- 环境:Python 3.8+,不需要装任何第三方库,纯标准库就能跑
- 基础:懂一点 Linux 基础命令和 Python 语法就行,零基础跟着步骤也能跑通
一、学习目标
- 理解 NAT(Network Address Translation) 的核心原理
- 掌握 SNAT(Source NAT) 的工作机制
- 用 Python 代码实现一个完整的 SNAT 功能
- 理解 NAT 表、连接跟踪等核心概念
二、核心原理
2.1 什么是 NAT?
NAT(Network Address Translation),即网络地址转换,是一种在数据包经过网络设备时,修改数据包头部 IP 地址的技术。
生活中的例子
想象你住在一个公寓楼里:
- 你的房间号:101(内网 IP:192.168.1.100)
- 公寓楼地址:XX路XX号(公网 IP:123.56.78.90)
当你往外寄快递时:
- 你写的寄件地址是:XX路XX号 101室
- 快递公司看到的地址是:XX路XX号(不关心具体哪个房间)
- 回信会寄到:XX路XX号
- 公寓前台会把回信转交给 101室
这就是 NAT 的核心思想:用一个公网 IP 代表整个内网。
2.2 为什么需要 NAT?
问题 1:IPv4 地址不够用
IPv4 只有 32 位,最多只能表示约 43 亿个地址。全球设备数量早就超过这个数字了。
解决方案:内网设备共用一个公网 IP
问题 2:安全防护
如果每台内网设备都有公网 IP,就会直接暴露在公网上,容易被攻击。
解决方案:NAT 天然隐藏内网结构
2.3 SNAT 工作流程
SNAT(Source NAT) 修改的是数据包的源 IP 地址,方向是内网 → 公网。
完整流程图
2.4 iptables 中的 SNAT 实现
在 Linux 中,SNAT 通过 iptables 的 nat 表实现:
# 语法
iptables -t nat -A POSTROUTING -s <内网网段> -o <公网接口> -j MASQUERADE
# 示例:让 192.168.1.0/24 网段通过 eth0 访问公网
iptables -t nat -A POSTROUTING -s 192.168.1.0/24 -o eth0 -j MASQUERADE关键参数解析
| 参数 | 含义 |
|---|---|
-t nat | 操作 nat 表 |
-A POSTROUTING | 在数据包离开本机前处理(修改源 IP) |
-s 192.168.1.0/24 | 匹配来自该网段的数据包 |
-o eth0 | 从 eth0 接口出去的数据包 |
-j MASQUERADE | 将源 IP 替换为 eth0 的 IP |
2.5 连接跟踪(Conntrack)
NAT 不是简单地修改每个数据包,而是需要记住转换关系,以便回程数据包能正确还原。
这就是 连接跟踪(Connection Tracking) 的作用:
为什么需要连接跟踪?
- 去程:内网 → 公网,修改源 IP
- 回程:公网 → 内网,需要知道还原成哪个内网 IP
没有连接跟踪,回程数据包就不知道该发给谁了。
三、代码分段解析
现在我们用 Python 代码实现一个完整的 SNAT 功能。我们将模拟以下场景:
3.1 数据结构定义
做什么:定义数据包、NAT 转换表条目等核心数据结构。
为什么这么做:清晰的数据结构是实现 NAT 网关的基础,帮助我们理解数据包的流转过程。
from dataclasses import dataclass, field
from typing import Dict, Optional
from enum import Enum
import random
import time
class ConnectionState(Enum):
"""连接状态"""
NEW = "NEW" # 新建连接
ESTABLISHED = "ESTABLISHED" # 已建立
RELATED = "RELATED" # 相关连接
CLOSED = "CLOSED" # 已关闭
@dataclass
class Packet:
"""数据包"""
src_ip: str # 源 IP
dst_ip: str # 目的 IP
src_port: int # 源端口
dst_port: int # 目的端口
protocol: str = "tcp" # 协议
payload: str = "" # 负载数据
def __str__(self):
return f"[{self.protocol}] {self.src_ip}:{self.src_port} → {self.dst_ip}:{self.dst_port}"
@dataclass
class NatEntry:
"""NAT 转换表条目"""
original_src_ip: str # 原始源 IP
original_src_port: int # 原始源端口
translated_src_ip: str # 转换后的源 IP
translated_src_port: int # 转换后的源端口
dst_ip: str # 目的 IP
dst_port: int # 目的端口
state: ConnectionState = ConnectionState.NEW
created_at: float = field(default_factory=time.time)
last_seen: float = field(default_factory=time.time)
@property
def original_key(self) -> str:
"""原始连接的唯一标识(四元组)"""
return f"{self.original_src_ip}:{self.original_src_port}-{self.dst_ip}:{self.dst_port}"
@property
def translated_key(self) -> str:
"""转换后连接的唯一标识"""
return f"{self.translated_src_ip}:{self.translated_src_port}-{self.dst_ip}:{self.dst_port}"关键要点:
- 使用
@dataclass装饰器简化数据类的定义 ConnectionState枚举表示连接的生命周期状态NatEntry通过四元组(源IP:源端口-目的IP:目的端口)唯一标识一个连接
3.2 端口分配器
做什么:模拟 NAT 网关的端口分配功能,为每个内网连接分配一个唯一的公网端口。
为什么这么做:多个内网设备共用一个公网 IP 时,需要通过不同的端口号来区分不同的连接。
class PortAllocator:
"""端口分配器"""
def __init__(self, start: int = 10000, end: int = 60000):
self.start = start
self.end = end
self.used_ports = set()
def allocate(self) -> Optional[int]:
"""分配一个可用端口"""
available = set(range(self.start, self.end + 1)) - self.used_ports
if not available:
return None
port = random.choice(list(available))
self.used_ports.add(port)
return port
def release(self, port: int):
"""释放一个端口"""
self.used_ports.discard(port)关键要点:
- 端口范围通常设置为 10000-60000,避免与系统端口冲突
- 使用集合(set)快速判断端口是否可用
- 这就是为什么多个内网 IP 可以共用一个公网 IP 的核心原理
3.3 SNAT 网关核心实现
做什么:实现 SNAT 网关的核心功能,包括出站数据包的源地址转换和入站数据包的反向转换。
为什么这么做:这是 NAT 网关的核心逻辑,理解它就理解了 SNAT 的工作原理。
class SNATGateway:
"""
SNAT 网关实现
功能:将内网 IP 转换为公网 IP,实现内网访问公网
"""
def __init__(self, name: str, internal_ip: str, external_ip: str):
self.name = name
self.internal_ip = internal_ip
self.external_ip = external_ip
# NAT 转换表:原始连接 → 转换后连接
self.nat_table: Dict[str, NatEntry] = {}
# 反向映射:转换后连接 → 原始连接(用于回程数据包的还原)
self.reverse_table: Dict[str, str] = {}
# 端口分配器
self.port_allocator = PortAllocator(start=10000, end=60000)
# 统计信息
self.stats = {
'packets_translated': 0,
'packets_dropped': 0,
'active_connections': 0
}
print(f"✅ SNAT 网关 [{name}] 创建成功")
print(f" 内网接口: {internal_ip}")
print(f" 公网接口: {external_ip}")
def translate_outbound(self, packet: Packet) -> Optional[Packet]:
"""
处理出站数据包(内网 → 公网)
核心:修改数据包的源 IP 地址
"""
print(f"\n📤 处理出站数据包: {packet}")
# 1. 检查是否已有 NAT 条目(连接复用)
entry_key = f"{packet.src_ip}:{packet.src_port}-{packet.dst_ip}:{packet.dst_port}"
if entry_key in self.nat_table:
# 已有条目,更新状态
entry = self.nat_table[entry_key]
entry.last_seen = time.time()
entry.state = ConnectionState.ESTABLISHED
print(f" ✅ 命中已有 NAT 条目: {entry.translated_src_ip}:{entry.translated_src_port}")
else:
# 2. 新建条目:分配端口并记录转换关系
translated_port = self.port_allocator.allocate()
if translated_port is None:
print(f" ❌ 端口耗尽,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
entry = NatEntry(
original_src_ip=packet.src_ip,
original_src_port=packet.src_port,
translated_src_ip=self.external_ip,
translated_src_port=translated_port,
dst_ip=packet.dst_ip,
dst_port=packet.dst_port,
state=ConnectionState.NEW
)
self.nat_table[entry_key] = entry
self.reverse_table[entry.translated_key] = entry_key
self.stats['active_connections'] += 1
print(f" ✅ 新建 NAT 条目:")
print(f" {packet.src_ip}:{packet.src_port} → {self.external_ip}:{translated_port}")
# 3. 创建转换后的数据包(修改源 IP)
translated_packet = Packet(
src_ip=self.external_ip, # 源 IP 改为公网 IP
dst_ip=packet.dst_ip, # 目的 IP 不变
src_port=entry.translated_src_port, # 源端口改为分配的端口
dst_port=packet.dst_port, # 目的端口不变
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
print(f" 📦 转换后: {translated_packet}")
return translated_packet
def translate_inbound(self, packet: Packet) -> Optional[Packet]:
"""
处理入站数据包(公网 → 内网)
核心:将目的 IP 还原为原始内网 IP
"""
print(f"\n📥 处理入站数据包: {packet}")
# 1. 查找反向映射
reverse_key = f"{packet.dst_ip}:{packet.dst_port}-{packet.src_ip}:{packet.src_port}"
if reverse_key not in self.reverse_table:
print(f" ❌ 没有匹配的 NAT 条目,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
# 2. 获取原始连接信息
original_key = self.reverse_table[reverse_key]
entry = self.nat_table[original_key]
# 更新状态
entry.last_seen = time.time()
entry.state = ConnectionState.ESTABLISHED
# 3. 创建转换后的数据包(还原目的 IP)
translated_packet = Packet(
src_ip=packet.src_ip, # 源 IP 不变
dst_ip=entry.original_src_ip, # 目的 IP 还原为内网 IP
src_port=packet.src_port, # 源端口不变
dst_port=entry.original_src_port, # 目的端口还原
protocol=packet.protocol,
payload=packet.payload
)
print(f" ✅ 反向 NAT 转换:")
print(f" {packet.dst_ip}:{packet.dst_port} → {entry.original_src_ip}:{entry.original_src_port}")
return translated_packet
def show_nat_table(self):
"""显示 NAT 转换表"""
print(f"\n{'='*60}")
print(f"📊 NAT 转换表 - {self.name}")
print(f"{'='*60}")
if not self.nat_table:
print(" (空)")
return
print(f"{'原始地址':<25} {'转换后地址':<25} {'状态':<12}")
print(f"{'-'*25} {'-'*25} {'-'*12}")
for entry in self.nat_table.values():
original = f"{entry.original_src_ip}:{entry.original_src_port}"
translated = f"{entry.translated_src_ip}:{entry.translated_src_port}"
state = entry.state.value
print(f"{original:<25} {translated:<25} {state:<12}")
def show_stats(self):
"""显示统计信息"""
print(f"\n{'='*60}")
print(f"📈 统计信息 - {self.name}")
print(f"{'='*60}")
print(f" 已转换数据包: {self.stats['packets_translated']}")
print(f" 已丢弃数据包: {self.stats['packets_dropped']}")
print(f" 活跃连接数: {self.stats['active_connections']}")关键要点:
translate_outbound:出站时修改源 IP 为公网 IP,分配新端口translate_inbound:入站时根据端口还原目的 IP 为内网 IP- 使用双向映射表(nat_table 和 reverse_table)实现快速查找
3.4 模拟网络设备
做什么:模拟内网主机和公网服务器,用于演示 SNAT 的工作流程。
为什么这么做:通过模拟真实的网络设备,可以直观地观察数据包的转换过程。
class Host:
"""模拟网络主机"""
def __init__(self, name: str, ip: str, gateway: str = None):
self.name = name
self.ip = ip
self.gateway = gateway
self.port_counter = 10000
print(f"✅ 主机 [{name}] 创建成功,IP: {ip}")
def create_packet(self, dst_ip: str, dst_port: int, payload: str = "") -> Packet:
"""创建一个数据包"""
self.port_counter += 1
return Packet(
src_ip=self.ip,
dst_ip=dst_ip,
src_port=self.port_counter,
dst_port=dst_port,
payload=payload
)
def receive_packet(self, packet: Packet):
"""接收一个数据包"""
print(f" 📥 [{self.name}] 收到数据包: {packet}")
if packet.payload:
print(f" 负载: {packet.payload}")3.5 交互式 CLI Shell
做什么:用 Python 自带的 cmd 模块构建一个类似网络设备路由器的交互式命令行界面,替代硬编码的演示逻辑。
为什么这么做:让读者可以在终端里自由、动态地创建网络资源并验证连通性,带来真实的沉浸感操作体验。
import cmd
import shlex
class SNATShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 SNAT 交互式模拟器\n' \
'【可用指令】:\n' \
' 1. create_gateway <name> <internal_ip> <external_ip> - 创建 NAT 网关\n' \
' 2. create_host <name> <ip> - 创建主机\n' \
' 3. snat <host_name> <dst_ip> <dst_port> - 测试 SNAT(内网→公网)\n' \
' 4. show_nat - 查看 NAT 转换表\n' \
' 5. show_stats - 查看统计信息\n' \
' 6. reset - 重置所有资源\n' \
' 7. exit - 退出\n' \
'==========================================================='
prompt = '(SNAT-Sim) '
def emptyline(self):
pass
def do_create_gateway(self, arg):
"""创建 NAT 网关: create_gateway <name> <internal_ip> <external_ip>"""
args = shlex.split(arg)
if len(args) != 3:
print("❌ 用法: create_gateway <name> <internal_ip> <external_ip>")
print(" 示例: create_gateway nat-gw 10.0.0.1 203.0.113.1")
return
try:
gateway = SNATGateway(args[0], args[1], args[2])
ALL_GATEWAYS[args[0]] = gateway
except Exception as e:
print(f"❌ 创建失败: {e}")
def do_create_host(self, arg):
"""创建主机: create_host <name> <ip>"""
args = shlex.split(arg)
if len(args) != 2:
print("❌ 用法: create_host <name> <ip>")
print(" 示例: create_host web-server 180.101.50.24")
return
try:
host = Host(args[0], args[1])
ALL_HOSTS[args[0]] = host
except Exception as e:
print(f"❌ 创建失败: {e}")
def do_snat(self, arg):
"""测试 SNAT: snat <host_name> <dst_ip> <dst_port>"""
args = shlex.split(arg)
if len(args) < 3:
print("❌ 用法: snat <host_name> <dst_ip> <dst_port>")
print(" 示例: snat internal-pc 180.101.50.24 80")
return
host_name, dst_ip, dst_port = args[0], args[1], int(args[2])
# 查找主机
host = ALL_HOSTS.get(host_name)
if not host:
print(f"❌ 主机 {host_name} 不存在")
return
# 查找 NAT 网关(自动选择第一个)
if not ALL_GATEWAYS:
print("❌ 没有可用的 NAT 网关,请先创建")
return
gateway = list(ALL_GATEWAYS.values())[0]
# 创建数据包并进行 SNAT 转换
request = host.create_packet(dst_ip, dst_port, f"Ping from {host_name}")
translated_request = gateway.translate_outbound(request)
if translated_request:
print(f"\n✅ [出站] SNAT 转换成功!")
print(f" 原始请求: {host.ip}:{request.src_port} → {dst_ip}:{dst_port}")
print(f" 转换请求: {gateway.external_ip}:{translated_request.src_port} → {dst_ip}:{dst_port}")
# 模拟公网服务器返回响应
response = Packet(
src_ip=dst_ip, dst_ip=translated_request.src_ip,
src_port=dst_port, dst_port=translated_request.src_port,
payload=f"Response to {host_name}"
)
print(f"\n🌍 公网服务器 {dst_ip}:{dst_port} 处理请求并返回响应...")
# NAT 网关处理入站响应
translated_response = gateway.translate_inbound(response)
if translated_response:
print(f"✅ [入站] 反向 SNAT 转换成功!")
print(f" 原始响应: {response.src_ip}:{response.src_port} → {response.dst_ip}:{response.dst_port}")
print(f" 转换响应: {translated_response.src_ip}:{translated_response.src_port} → {translated_response.dst_ip}:{translated_response.dst_port}")
print(f"🎉 {host_name} 成功收到外网响应!")
else:
print(f"❌ [入站] 反向 SNAT 转换失败(无法找到连接记录)")
else:
print(f"\n❌ [出站] SNAT 转换失败")
def do_show_nat(self, arg):
"""查看 NAT 转换表: show_nat"""
if not ALL_GATEWAYS:
print("❌ 没有可用的 NAT 网关")
return
for gateway in ALL_GATEWAYS.values():
gateway.show_nat_table()
def do_show_stats(self, arg):
"""查看统计信息: show_stats"""
if not ALL_GATEWAYS:
print("❌ 没有可用的 NAT 网关")
return
for gateway in ALL_GATEWAYS.values():
gateway.show_stats()
def do_reset(self, arg):
"""重置所有资源: reset"""
ALL_GATEWAYS.clear()
ALL_HOSTS.clear()
print("✅ 所有资源已重置")
def do_exit(self, arg):
"""退出: exit"""
print("👋 再见!")
return True
if __name__ == "__main__":
SNATShell().cmdloop()一键体验配置:
将以下指令粘贴进模拟器,可以直接体验 SNAT 核心功能:
create_gateway nat-gw 10.0.0.1 203.0.113.1
create_host web-server 180.101.50.24
create_host pc-1 10.0.1.10
create_host pc-2 10.0.1.11
snat pc-1 180.101.50.24 80
snat pc-2 180.101.50.24 80
show_nat四、完整可运行代码
将上面所有代码片段合并,保存为 snat_demo.py 即可运行。完整代码如下:
#!/usr/bin/env python3
"""
SNAT 模拟器 - 深入理解源地址转换的工作原理
用纯 Python 实现一个简化的 SNAT 网关
"""
from dataclasses import dataclass, field
from typing import Dict, Optional
from enum import Enum
import random
import time
class ConnectionState(Enum):
"""连接状态"""
NEW = "NEW" # 新建连接
ESTABLISHED = "ESTABLISHED" # 已建立
RELATED = "RELATED" # 相关连接
CLOSED = "CLOSED" # 已关闭
@dataclass
class Packet:
"""数据包"""
src_ip: str # 源 IP
dst_ip: str # 目的 IP
src_port: int # 源端口
dst_port: int # 目的端口
protocol: str = "tcp" # 协议
payload: str = "" # 负载数据
def __str__(self):
return f"[{self.protocol}] {self.src_ip}:{self.src_port} → {self.dst_ip}:{self.dst_port}"
@dataclass
class NatEntry:
"""NAT 转换表条目"""
original_src_ip: str # 原始源 IP
original_src_port: int # 原始源端口
translated_src_ip: str # 转换后的源 IP
translated_src_port: int # 转换后的源端口
dst_ip: str # 目的 IP
dst_port: int # 目的端口
state: ConnectionState = ConnectionState.NEW
created_at: float = field(default_factory=time.time)
last_seen: float = field(default_factory=time.time)
@property
def original_key(self) -> str:
"""原始连接的唯一标识(四元组)"""
return f"{self.original_src_ip}:{self.original_src_port}-{self.dst_ip}:{self.dst_port}"
@property
def translated_key(self) -> str:
"""转换后连接的唯一标识"""
return f"{self.translated_src_ip}:{self.translated_src_port}-{self.dst_ip}:{self.dst_port}"
class PortAllocator:
"""端口分配器"""
def __init__(self, start: int = 10000, end: int = 60000):
self.start = start
self.end = end
self.used_ports = set()
def allocate(self) -> Optional[int]:
"""分配一个可用端口"""
available = set(range(self.start, self.end + 1)) - self.used_ports
if not available:
return None
port = random.choice(list(available))
self.used_ports.add(port)
return port
def release(self, port: int):
"""释放一个端口"""
self.used_ports.discard(port)
class SNATGateway:
"""
SNAT 网关实现
功能:将内网 IP 转换为公网 IP,实现内网访问公网
"""
def __init__(self, name: str, internal_ip: str, external_ip: str):
self.name = name
self.internal_ip = internal_ip
self.external_ip = external_ip
# NAT 转换表:原始连接 → 转换后连接
self.nat_table: Dict[str, NatEntry] = {}
# 反向映射:转换后连接 → 原始连接(用于回程数据包的还原)
self.reverse_table: Dict[str, str] = {}
# 端口分配器
self.port_allocator = PortAllocator(start=10000, end=60000)
# 统计信息
self.stats = {
'packets_translated': 0,
'packets_dropped': 0,
'active_connections': 0
}
print(f"✅ SNAT 网关 [{name}] 创建成功")
print(f" 内网接口: {internal_ip}")
print(f" 公网接口: {external_ip}")
def translate_outbound(self, packet: Packet) -> Optional[Packet]:
"""
处理出站数据包(内网 → 公网)
核心:修改数据包的源 IP 地址
"""
print(f"\n📤 处理出站数据包: {packet}")
# 1. 检查是否已有 NAT 条目(连接复用)
entry_key = f"{packet.src_ip}:{packet.src_port}-{packet.dst_ip}:{packet.dst_port}"
if entry_key in self.nat_table:
# 已有条目,更新状态
entry = self.nat_table[entry_key]
entry.last_seen = time.time()
entry.state = ConnectionState.ESTABLISHED
print(f" ✅ 命中已有 NAT 条目: {entry.translated_src_ip}:{entry.translated_src_port}")
else:
# 2. 新建条目:分配端口并记录转换关系
translated_port = self.port_allocator.allocate()
if translated_port is None:
print(f" ❌ 端口耗尽,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
entry = NatEntry(
original_src_ip=packet.src_ip,
original_src_port=packet.src_port,
translated_src_ip=self.external_ip,
translated_src_port=translated_port,
dst_ip=packet.dst_ip,
dst_port=packet.dst_port,
state=ConnectionState.NEW
)
self.nat_table[entry_key] = entry
self.reverse_table[entry.translated_key] = entry_key
self.stats['active_connections'] += 1
print(f" ✅ 新建 NAT 条目:")
print(f" {packet.src_ip}:{packet.src_port} → {self.external_ip}:{translated_port}")
# 3. 创建转换后的数据包(修改源 IP)
translated_packet = Packet(
src_ip=self.external_ip, # 源 IP 改为公网 IP
dst_ip=packet.dst_ip, # 目的 IP 不变
src_port=entry.translated_src_port, # 源端口改为分配的端口
dst_port=packet.dst_port, # 目的端口不变
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
print(f" 📦 转换后: {translated_packet}")
return translated_packet
def translate_inbound(self, packet: Packet) -> Optional[Packet]:
"""
处理入站数据包(公网 → 内网)
核心:将目的 IP 还原为原始内网 IP
"""
print(f"\n📥 处理入站数据包: {packet}")
# 1. 查找反向映射
reverse_key = f"{packet.dst_ip}:{packet.dst_port}-{packet.src_ip}:{packet.src_port}"
if reverse_key not in self.reverse_table:
print(f" ❌ 没有匹配的 NAT 条目,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
# 2. 获取原始连接信息
original_key = self.reverse_table[reverse_key]
entry = self.nat_table[original_key]
# 更新状态
entry.last_seen = time.time()
entry.state = ConnectionState.ESTABLISHED
# 3. 创建转换后的数据包(还原目的 IP)
translated_packet = Packet(
src_ip=packet.src_ip, # 源 IP 不变
dst_ip=entry.original_src_ip, # 目的 IP 还原为内网 IP
src_port=packet.src_port, # 源端口不变
dst_port=entry.original_src_port, # 目的端口还原
protocol=packet.protocol,
payload=packet.payload
)
print(f" ✅ 反向 NAT 转换:")
print(f" {packet.dst_ip}:{packet.dst_port} → {entry.original_src_ip}:{entry.original_src_port}")
return translated_packet
def show_nat_table(self):
"""显示 NAT 转换表"""
print(f"\n{'='*60}")
print(f"📊 NAT 转换表 - {self.name}")
print(f"{'='*60}")
if not self.nat_table:
print(" (空)")
return
print(f"{'原始地址':<25} {'转换后地址':<25} {'状态':<12}")
print(f"{'-'*25} {'-'*25} {'-'*12}")
for entry in self.nat_table.values():
original = f"{entry.original_src_ip}:{entry.original_src_port}"
translated = f"{entry.translated_src_ip}:{entry.translated_src_port}"
state = entry.state.value
print(f"{original:<25} {translated:<25} {state:<12}")
def show_stats(self):
"""显示统计信息"""
print(f"\n{'='*60}")
print(f"📈 统计信息 - {self.name}")
print(f"{'='*60}")
print(f" 已转换数据包: {self.stats['packets_translated']}")
print(f" 已丢弃数据包: {self.stats['packets_dropped']}")
print(f" 活跃连接数: {self.stats['active_connections']}")
class Host:
"""模拟网络主机"""
def __init__(self, name: str, ip: str, gateway: str = None):
self.name = name
self.ip = ip
self.gateway = gateway
self.port_counter = 10000
print(f"✅ 主机 [{name}] 创建成功,IP: {ip}")
def create_packet(self, dst_ip: str, dst_port: int, payload: str = "") -> Packet:
"""创建一个数据包"""
self.port_counter += 1
return Packet(
src_ip=self.ip,
dst_ip=dst_ip,
src_port=self.port_counter,
dst_port=dst_port,
payload=payload
)
def receive_packet(self, packet: Packet):
"""接收一个数据包"""
print(f" 📥 [{self.name}] 收到数据包: {packet}")
if packet.payload:
print(f" 负载: {packet.payload}")
# ============================================================
# 交互式 CLI Shell
# ============================================================
import cmd
import shlex
# 全局资源
ALL_GATEWAYS: Dict[str, SNATGateway] = {}
ALL_HOSTS: Dict[str, Host] = {}
class SNATShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 SNAT 交互式模拟器\n' \
'【可用指令】:\n' \
' 1. create_gateway <name> <internal_ip> <external_ip> - 创建 NAT 网关\n' \
' 2. create_host <name> <ip> - 创建主机\n' \
' 3. snat <host_name> <dst_ip> <dst_port> - 测试 SNAT(内网→公网)\n' \
' 4. show_nat - 查看 NAT 转换表\n' \
' 5. show_stats - 查看统计信息\n' \
' 6. reset - 重置所有资源\n' \
' 7. exit - 退出\n' \
'==========================================================='
prompt = '(SNAT-Sim) '
def emptyline(self):
pass
def do_create_gateway(self, arg):
"""创建 NAT 网关: create_gateway <name> <internal_ip> <external_ip>"""
args = shlex.split(arg)
if len(args) != 3:
print("❌ 用法: create_gateway <name> <internal_ip> <external_ip>")
print(" 示例: create_gateway nat-gw 10.0.0.1 203.0.113.1")
return
try:
gateway = SNATGateway(args[0], args[1], args[2])
ALL_GATEWAYS[args[0]] = gateway
except Exception as e:
print(f"❌ 创建失败: {e}")
def do_create_host(self, arg):
"""创建主机: create_host <name> <ip>"""
args = shlex.split(arg)
if len(args) != 2:
print("❌ 用法: create_host <name> <ip>")
print(" 示例: create_host web-server 180.101.50.24")
return
try:
host = Host(args[0], args[1])
ALL_HOSTS[args[0]] = host
except Exception as e:
print(f"❌ 创建失败: {e}")
def do_snat(self, arg):
"""测试 SNAT: snat <host_name> <dst_ip> <dst_port>"""
args = shlex.split(arg)
if len(args) < 3:
print("❌ 用法: snat <host_name> <dst_ip> <dst_port>")
print(" 示例: snat internal-pc 180.101.50.24 80")
return
host_name, dst_ip, dst_port = args[0], args[1], int(args[2])
# 查找主机
host = ALL_HOSTS.get(host_name)
if not host:
print(f"❌ 主机 {host_name} 不存在")
return
# 查找 NAT 网关(自动选择第一个)
if not ALL_GATEWAYS:
print("❌ 没有可用的 NAT 网关,请先创建")
return
gateway = list(ALL_GATEWAYS.values())[0]
# 创建数据包并进行 SNAT 转换
request = host.create_packet(dst_ip, dst_port, f"Ping from {host_name}")
translated = gateway.translate_outbound(request)
if translated:
print(f"\n✅ SNAT 转换成功!")
print(f" 原始: {host.ip}:{request.src_port} → {dst_ip}:{dst_port}")
print(f" 转换: {gateway.external_ip}:{translated.src_port} → {dst_ip}:{dst_port}")
else:
print(f"\n❌ SNAT 转换失败")
def do_show_nat(self, arg):
"""查看 NAT 转换表: show_nat"""
if not ALL_GATEWAYS:
print("❌ 没有可用的 NAT 网关")
return
for gateway in ALL_GATEWAYS.values():
gateway.show_nat_table()
def do_show_stats(self, arg):
"""查看统计信息: show_stats"""
if not ALL_GATEWAYS:
print("❌ 没有可用的 NAT 网关")
return
for gateway in ALL_GATEWAYS.values():
gateway.show_stats()
def do_reset(self, arg):
"""重置所有资源: reset"""
ALL_GATEWAYS.clear()
ALL_HOSTS.clear()
print("✅ 所有资源已重置")
def do_exit(self, arg):
"""退出: exit"""
print("👋 再见!")
return True
if __name__ == "__main__":
SNATShell().cmdloop()五、运行方法
- 将代码保存为
nat_part1_basic.py - 执行:
python3 nat_part1_basic.py- 运行结果
root@ubuntu:~# python3 nat_part1_basic.py
===========================================================
欢迎使用 SNAT 交互式模拟器
【可用指令】:
1. create_gateway <name> <internal_ip> <external_ip> - 创建 NAT 网关
2. create_host <name> <ip> - 创建主机
3. snat <host_name> <dst_ip> <dst_port> - 测试 SNAT(内网→公网)
4. show_nat - 查看 NAT 转换表
5. show_stats - 查看统计信息
6. reset - 重置所有资源
7. exit - 退出
===========================================================
(SNAT-Sim) create_gateway nat-gw 10.0.0.1 203.0.113.1
✅ SNAT 网关 [nat-gw] 创建成功
内网接口: 10.0.0.1
公网接口: 203.0.113.1
(SNAT-Sim) create_host web-server 180.101.50.24
✅ 主机 [web-server] 创建成功,IP: 180.101.50.24
(SNAT-Sim) create_host pc-1 10.0.1.10
✅ 主机 [pc-1] 创建成功,IP: 10.0.1.10
(SNAT-Sim) create_host pc-2 10.0.1.11
✅ 主机 [pc-2] 创建成功,IP: 10.0.1.11
(SNAT-Sim) snat pc-1 180.101.50.24 80
📤 处理出站数据包: [tcp] 10.0.1.10:10001 → 180.101.50.24:80
✅ 新建 NAT 条目:
10.0.1.10:10001 → 203.0.113.1:18112
📦 转换后: [tcp] 203.0.113.1:18112 → 180.101.50.24:80
✅ SNAT 转换成功!
原始: 10.0.1.10:10001 → 180.101.50.24:80
转换: 203.0.113.1:18112 → 180.101.50.24:80
(SNAT-Sim) snat pc-2 180.101.50.24 80
📤 处理出站数据包: [tcp] 10.0.1.11:10001 → 180.101.50.24:80
✅ 新建 NAT 条目:
10.0.1.11:10001 → 203.0.113.1:53383
📦 转换后: [tcp] 203.0.113.1:53383 → 180.101.50.24:80
✅ SNAT 转换成功!
原始: 10.0.1.11:10001 → 180.101.50.24:80
转换: 203.0.113.1:53383 → 180.101.50.24:80
(SNAT-Sim) show_nat
============================================================
📊 NAT 转换表 - nat-gw
============================================================
原始地址 转换后地址 状态
------------------------- ------------------------- ------------
10.0.1.10:10001 203.0.113.1:18112 NEW
10.0.1.11:10001 203.0.113.1:53383 NEW六、在线互动体验
下面是一个交互式网络拓扑演示,直观展示 SNAT 的工作流程:
演示说明:
- 点击 「SNAT: 内网 → 公网」 按钮,观察内网主机如何通过 NAT 网关访问公网
- 点击 「无 NAT: 内网 → 公网」 按钮,观察没有 NAT 时会发生什么
- 注意观察数据包的源 IP 地址是如何被转换的
常见问题
Q1: 为什么需要端口分配器?
A: 当多个内网设备共用一个公网 IP 时,需要通过不同的端口号来区分不同的连接。端口分配器就是为每个连接分配一个唯一端口的组件。
Q2: NAT 转换表有什么作用?
A: NAT 转换表记录了原始连接和转换后连接的映射关系。当公网服务器返回响应时,NAT 网关需要根据这个表来知道应该把数据包转发给哪个内网设备。
Q3: 为什么需要反向映射表?
A: 反向映射表(reverse_table)用于快速查找。当收到公网返回的数据包时,需要根据转换后的地址快速找到原始连接信息,这样才能正确还原目的 IP。
七、下一篇预告
在下一篇中,我们将学习:
- DNAT(目的地址转换) 的工作原理
- 如何让公网访问内网服务
- 端口映射的实现机制
- 完整的 DNAT 代码实现
