第三篇:NAT 网关完整实现与高级特性
运行前提条件
- 系统:Linux 系统(Ubuntu 20.04+ / Debian 11+ / CentOS 8+ 均可),Windows 用户用 WSL2 也可以
- 权限:必须有 sudo /root 权限(操作内核网络模块需要)
- 环境:Python 3.8+,不需要装任何第三方库,纯标准库就能跑
- 基础:懂一点 Linux 基础命令和 Python 语法就行,零基础跟着步骤也能跑通
一、学习目标
- 实现一个功能完整的 NAT 网关
- 深入理解连接跟踪(Conntrack) 的工作原理
- 了解 NAT 穿透技术
- 掌握生产环境中的 NAT 最佳实践
二、NAT 网关完整架构
在前两篇中,我们分别实现了 SNAT 和 DNAT。现在我们将它们整合成一个完整的 NAT 网关。
2.1 架构图
2.2 核心组件
- 连接跟踪表:记录所有活动连接的转换关系
- SNAT 规则表:定义内网到公网的地址转换规则
- DNAT 规则表:定义公网到内网的端口映射规则
- 数据包处理引擎:根据规则转换数据包
三、代码分段解析
我们将基于前面的基础,实现一个集成了 SNAT、DNAT 和连接跟踪的完整 NAT 网关。
3.1 数据结构定义
做什么:定义连接状态、协议、数据包、NAT 规则和连接跟踪条目。
为什么这么做:一个完整的 NAT 网关需要处理不同协议,同时管理 SNAT 和 DNAT 的规则,连接跟踪条目也需要记录更丰富的信息(如收发字节数、过期时间)。
import time
import threading
from dataclasses import dataclass, field
from typing import Dict, Optional, List, Set, Tuple
from enum import Enum
from collections import defaultdict
# ============================================================
# 数据结构定义
# ============================================================
class ConnectionState(Enum):
"""连接状态"""
NEW = "NEW"
ESTABLISHED = "ESTABLISHED"
RELATED = "RELATED"
CLOSED = "CLOSED"
TIME_WAIT = "TIME_WAIT"
class Protocol(Enum):
"""协议类型"""
TCP = "tcp"
UDP = "udp"
ICMP = "icmp"
@dataclass
class Packet:
"""数据包"""
src_ip: str
dst_ip: str
src_port: int
dst_port: int
protocol: str = "tcp"
payload: str = ""
timestamp: float = field(default_factory=time.time)
@property
def connection_key(self) -> str:
"""连接的唯一标识(四元组)"""
return f"{self.src_ip}:{self.src_port}-{self.dst_ip}:{self.dst_port}-{self.protocol}"
def __str__(self):
return f"[{self.protocol}] {self.src_ip}:{self.src_port} → {self.dst_ip}:{self.dst_port}"
@dataclass
class NATRule:
"""NAT 规则"""
name: str
rule_type: str # "snat" 或 "dnat"
src_cidr: str = "" # 源网段(SNAT)
dst_ip: str = "" # 目的 IP(DNAT)
dst_port: int = 0 # 目的端口(DNAT)
translated_ip: str = "" # 转换后的 IP
translated_port: int = 0 # 转换后的端口
protocol: str = "tcp"
priority: int = 100 # 优先级(数字越小优先级越高)
enabled: bool = True
def matches_snat(self, packet: Packet) -> bool:
"""检查是否匹配 SNAT 规则"""
if self.rule_type != "snat" or not self.enabled:
return False
# 简化的 CIDR 匹配(实际应该使用 ipaddress 模块)
return packet.src_ip.startswith(self.src_cidr.rsplit('.', 1)[0])
def matches_dnat(self, packet: Packet) -> bool:
"""检查是否匹配 DNAT 规则"""
if self.rule_type != "dnat" or not self.enabled:
return False
return (
packet.dst_ip == self.dst_ip and
packet.dst_port == self.dst_port and
packet.protocol == self.protocol
)
@dataclass
class ConnectionEntry:
"""连接跟踪条目"""
# 原始连接信息
original_src_ip: str
original_src_port: int
original_dst_ip: str
original_dst_port: int
protocol: str
# 转换后的连接信息
translated_src_ip: str
translated_src_port: int
translated_dst_ip: str
translated_dst_port: int
# 连接状态
state: ConnectionState = ConnectionState.NEW
nat_type: str = "" # "snat", "dnat", "both"
# 时间信息
created_at: float = field(default_factory=time.time)
last_seen: float = field(default_factory=time.time)
bytes_sent: int = 0
bytes_received: int = 0
@property
def original_key(self) -> str:
"""原始连接的唯一标识"""
return f"{self.original_src_ip}:{self.original_src_port}-{self.original_dst_ip}:{self.original_dst_port}-{self.protocol}"
@property
def translated_key(self) -> str:
"""转换后连接的唯一标识"""
return f"{self.translated_src_ip}:{self.translated_src_port}-{self.translated_dst_ip}:{self.translated_dst_port}-{self.protocol}"
def update_activity(self):
"""更新活动时间"""
self.last_seen = time.time()
self.state = ConnectionState.ESTABLISHED
def is_expired(self, timeout: int = 300) -> bool:
"""检查连接是否过期"""
return time.time() - self.last_seen > timeout3.2 连接跟踪管理器
做什么:实现 ConnectionTracker 类,负责管理所有活动的连接,提供双向查找能力,并定期清理过期连接。
为什么这么做:连接跟踪是 NAT 的核心。网关需要为每个通过的数据包匹配连接条目,并在后台自动清理长时间未活动的连接以释放内存。
# ============================================================
# 连接跟踪管理器
# ============================================================
class ConnectionTracker:
"""连接跟踪管理器"""
def __init__(self, timeout: int = 300):
"""
初始化连接跟踪管理器
Args:
timeout: 连接超时时间(秒)
"""
self.connections: Dict[str, ConnectionEntry] = {}
self.reverse_connections: Dict[str, str] = {}
self.timeout = timeout
self.lock = threading.Lock()
# 统计信息
self.stats = {
'total_connections': 0,
'active_connections': 0,
'expired_connections': 0
}
# 启动清理线程
self._start_cleanup_thread()
def _start_cleanup_thread(self):
"""启动过期连接清理线程"""
def cleanup_loop():
while True:
time.sleep(60) # 每分钟清理一次
self.cleanup_expired()
thread = threading.Thread(target=cleanup_loop, daemon=True)
thread.start()
def add_connection(self, entry: ConnectionEntry):
"""添加连接"""
with self.lock:
self.connections[entry.original_key] = entry
self.reverse_connections[entry.translated_key] = entry.original_key
self.stats['total_connections'] += 1
self.stats['active_connections'] += 1
def find_by_original(self, key: str) -> Optional[ConnectionEntry]:
"""通过原始连接查找"""
with self.lock:
entry = self.connections.get(key)
if entry:
entry.update_activity()
return entry
def find_by_translated(self, key: str) -> Optional[ConnectionEntry]:
"""通过转换后连接查找"""
with self.lock:
original_key = self.reverse_connections.get(key)
if original_key:
entry = self.connections.get(original_key)
if entry:
entry.update_activity()
return entry
return None
def cleanup_expired(self):
"""清理过期连接"""
with self.lock:
expired_keys = []
for key, entry in self.connections.items():
if entry.is_expired(self.timeout):
expired_keys.append(key)
for key in expired_keys:
entry = self.connections[key]
del self.connections[key]
if entry.translated_key in self.reverse_connections:
del self.reverse_connections[entry.translated_key]
self.stats['expired_connections'] += 1
self.stats['active_connections'] -= 1
if expired_keys:
print(f"🧹 清理了 {len(expired_keys)} 个过期连接")
def get_all_connections(self) -> List[ConnectionEntry]:
"""获取所有连接"""
with self.lock:
return list(self.connections.values())3.3 NAT 网关核心实现
做什么:实现 NATGateway,整合连接跟踪、SNAT 规则、DNAT 规则和数据包处理逻辑。
为什么这么做:这里是处理数据包的大脑。出站流量优先匹配已建立的连接,否则尝试匹配 SNAT 规则;入站流量同理,优先匹配连接,否则尝试匹配 DNAT 规则。
# ============================================================
# NAT 网关实现
# ============================================================
class NATGateway:
"""
完整 NAT 网关实现
功能:SNAT + DNAT + 连接跟踪
"""
def __init__(self, name: str, public_ip: str, private_ip: str):
"""
初始化 NAT 网关
Args:
name: 网关名称
public_ip: 公网 IP
private_ip: 内网 IP
"""
self.name = name
self.public_ip = public_ip
self.private_ip = private_ip
# 连接跟踪管理器
self.conntrack = ConnectionTracker(timeout=300)
# NAT 规则
self.snat_rules: List[NATRule] = []
self.dnat_rules: List[NATRule] = []
# 端口分配器
self.port_allocator = PortAllocator(start=10000, end=60000)
# 统计信息
self.stats = {
'packets_translated': 0,
'packets_dropped': 0,
'snat_translations': 0,
'dnat_translations': 0
}
print(f"✅ NAT 网关 [{name}] 创建成功")
print(f" 公网 IP: {public_ip}")
print(f" 内网 IP: {private_ip}")
def add_snat_rule(self, rule: NATRule):
"""添加 SNAT 规则"""
rule.rule_type = "snat"
self.snat_rules.append(rule)
self.snat_rules.sort(key=lambda r: r.priority)
print(f"✅ 添加 SNAT 规则: {rule.name}")
print(f" {rule.src_cidr} → {rule.translated_ip}")
def add_dnat_rule(self, rule: NATRule):
"""添加 DNAT 规则"""
rule.rule_type = "dnat"
self.dnat_rules.append(rule)
self.dnat_rules.sort(key=lambda r: r.priority)
print(f"✅ 添加 DNAT 规则: {rule.name}")
print(f" {rule.dst_ip}:{rule.dst_port} → {rule.translated_ip}:{rule.translated_port}")
def process_packet(self, packet: Packet, direction: str) -> Optional[Packet]:
"""
处理数据包
Args:
packet: 原始数据包
direction: 方向 ("inbound" 或 "outbound")
Returns:
转换后的数据包
"""
if direction == "inbound":
return self._process_inbound(packet)
else:
return self._process_outbound(packet)
def _process_inbound(self, packet: Packet) -> Optional[Packet]:
"""处理入站数据包(公网 → 内网)"""
print(f"\n📥 处理入站数据包: {packet}")
# 1. 检查是否已有连接
conn_key = f"{packet.src_ip}:{packet.src_port}-{packet.dst_ip}:{packet.dst_port}-{packet.protocol}"
existing_conn = self.conntrack.find_by_original(conn_key)
if existing_conn:
# 已有连接,直接使用
print(f" ✅ 命中已有连接")
translated = Packet(
src_ip=packet.src_ip,
dst_ip=existing_conn.translated_dst_ip,
src_port=packet.src_port,
dst_port=existing_conn.translated_dst_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
return translated
# 2. 查找匹配的 DNAT 规则
for rule in self.dnat_rules:
if rule.matches_dnat(packet):
print(f" ✅ 匹配 DNAT 规则: {rule.name}")
# 创建连接条目
entry = ConnectionEntry(
original_src_ip=packet.src_ip,
original_src_port=packet.src_port,
original_dst_ip=packet.dst_ip,
original_dst_port=packet.dst_port,
protocol=packet.protocol,
translated_src_ip=packet.src_ip,
translated_src_port=packet.src_port,
translated_dst_ip=rule.translated_ip,
translated_dst_port=rule.translated_port,
nat_type="dnat"
)
self.conntrack.add_connection(entry)
# 创建转换后的数据包
translated = Packet(
src_ip=packet.src_ip,
dst_ip=rule.translated_ip,
src_port=packet.src_port,
dst_port=rule.translated_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
self.stats['dnat_translations'] += 1
return translated
print(f" ❌ 没有匹配的规则,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
def _process_outbound(self, packet: Packet) -> Optional[Packet]:
"""处理出站数据包(内网 → 公网)"""
print(f"\n📤 处理出站数据包: {packet}")
# 1. 检查是否已有连接
conn_key = f"{packet.src_ip}:{packet.src_port}-{packet.dst_ip}:{packet.dst_port}-{packet.protocol}"
existing_conn = self.conntrack.find_by_original(conn_key)
if existing_conn:
# 已有连接,直接使用
print(f" ✅ 命中已有连接")
translated = Packet(
src_ip=existing_conn.translated_src_ip,
dst_ip=packet.dst_ip,
src_port=existing_conn.translated_src_port,
dst_port=packet.dst_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
return translated
# 2. 查找匹配的 SNAT 规则
for rule in self.snat_rules:
if rule.matches_snat(packet):
print(f" ✅ 匹配 SNAT 规则: {rule.name}")
# 分配端口
translated_port = self.port_allocator.allocate()
if translated_port is None:
print(f" ❌ 端口耗尽,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
# 创建连接条目
entry = ConnectionEntry(
original_src_ip=packet.src_ip,
original_src_port=packet.src_port,
original_dst_ip=packet.dst_ip,
original_dst_port=packet.dst_port,
protocol=packet.protocol,
translated_src_ip=rule.translated_ip,
translated_src_port=translated_port,
translated_dst_ip=packet.dst_ip,
translated_dst_port=packet.dst_port,
nat_type="snat"
)
self.conntrack.add_connection(entry)
# 创建转换后的数据包
translated = Packet(
src_ip=rule.translated_ip,
dst_ip=packet.dst_ip,
src_port=translated_port,
dst_port=packet.dst_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
self.stats['snat_translations'] += 1
return translated
# 3. 如果没有 SNAT 规则,直接转发
print(f" ⚠️ 没有匹配的 SNAT 规则,直接转发")
return packet
def show_rules(self):
"""显示所有规则"""
print(f"\n{'='*70}")
print(f"📋 NAT 规则列表 - {self.name}")
print(f"{'='*70}")
print(f"\n--- SNAT 规则 ---")
if not self.snat_rules:
print(" (空)")
else:
print(f"{'名称':<15} {'源网段':<20} {'转换IP':<20} {'优先级':<8} {'状态':<8}")
print(f"{'-'*15} {'-'*20} {'-'*20} {'-'*8} {'-'*8}")
for rule in self.snat_rules:
status = "启用" if rule.enabled else "禁用"
print(f"{rule.name:<15} {rule.src_cidr:<20} {rule.translated_ip:<20} {rule.priority:<8} {status:<8}")
print(f"\n--- DNAT 规则 ---")
if not self.dnat_rules:
print(" (空)")
else:
print(f"{'名称':<15} {'目的地址':<25} {'转换地址':<25} {'优先级':<8} {'状态':<8}")
print(f"{'-'*15} {'-'*25} {'-'*25} {'-'*8} {'-'*8}")
for rule in self.dnat_rules:
dst = f"{rule.dst_ip}:{rule.dst_port}"
translated = f"{rule.translated_ip}:{rule.translated_port}"
status = "启用" if rule.enabled else "禁用"
print(f"{rule.name:<15} {dst:<25} {translated:<25} {rule.priority:<8} {status:<8}")
def show_connections(self):
"""显示连接跟踪表"""
print(f"\n{'='*70}")
print(f"📊 连接跟踪表 - {self.name}")
print(f"{'='*70}")
connections = self.conntrack.get_all_connections()
if not connections:
print(" (空)")
return
print(f"{'源地址':<20} {'原始目的':<20} {'转换后目的':<20} {'类型':<8} {'状态':<12}")
print(f"{'-'*20} {'-'*20} {'-'*20} {'-'*8} {'-'*12}")
for conn in connections[:20]: # 只显示前 20 条
src = f"{conn.original_src_ip}:{conn.original_src_port}"
original_dst = f"{conn.original_dst_ip}:{conn.original_dst_port}"
translated_dst = f"{conn.translated_dst_ip}:{conn.translated_dst_port}"
print(f"{src:<20} {original_dst:<20} {translated_dst:<20} {conn.nat_type:<8} {conn.state.value:<12}")
if len(connections) > 20:
print(f" ... 还有 {len(connections) - 20} 条连接")
def show_stats(self):
"""显示统计信息"""
print(f"\n{'='*70}")
print(f"📈 统计信息 - {self.name}")
print(f"{'='*70}")
print(f" 已转换数据包: {self.stats['packets_translated']}")
print(f" 已丢弃数据包: {self.stats['packets_dropped']}")
print(f" SNAT 转换次数: {self.stats['snat_translations']}")
print(f" DNAT 转换次数: {self.stats['dnat_translations']}")
print(f" 活跃连接数: {self.conntrack.stats['active_connections']}")
print(f" 历史连接总数: {self.conntrack.stats['total_connections']}")3.4 端口分配器与网络设备
做什么:实现 PortAllocator 以及用于模拟发包的 Host 类。
为什么这么做:我们需要基础设施来进行测试演示。
# ============================================================
# 端口分配器
# ============================================================
class PortAllocator:
"""端口分配器"""
def __init__(self, start: int = 10000, end: int = 60000):
self.start = start
self.end = end
self.used_ports: Set[int] = set()
def allocate(self) -> Optional[int]:
"""分配一个可用端口"""
for port in range(self.start, self.end + 1):
if port not in self.used_ports:
self.used_ports.add(port)
return port
return None
def release(self, port: int):
"""释放一个端口"""
self.used_ports.discard(port)
# ============================================================
# 模拟网络设备
# ============================================================
class Host:
"""模拟网络主机"""
def __init__(self, name: str, ip: str, gateway: str = None):
self.name = name
self.ip = ip
self.gateway = gateway
self.port_counter = 10000
print(f"✅ 主机 [{name}] 创建成功,IP: {ip}")
def create_packet(self, dst_ip: str, dst_port: int, payload: str = "") -> Packet:
"""创建一个数据包"""
self.port_counter += 1
return Packet(
src_ip=self.ip,
dst_ip=dst_ip,
src_port=self.port_counter,
dst_port=dst_port,
payload=payload
)
def receive_packet(self, packet: Packet):
"""接收一个数据包"""
print(f" 📥 [{self.name}] 收到数据包: {packet}")
if packet.payload:
print(f" 负载: {packet.payload}")3.5 交互式 CLI Shell
做什么:用 Python 自带的 cmd 模块构建一个类似网络设备路由器的交互式命令行界面,替代硬编码的演示逻辑。
为什么这么做:让读者可以在终端里自由、动态地创建网络资源并验证连通性,带来真实的沉浸感操作体验。
import cmd
import shlex
class GatewayShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 NAT Gateway 交互式模拟器\n' \
'【可用指令】:\n' \
' 1. create_gateway <name> <public_ip> <private_ip> - 创建 NAT 网关\n' \
' 2. create_host <name> <ip> - 创建主机\n' \
' 3. add_snat <name> <src_cidr> <translated_ip> - 添加 SNAT 规则\n' \
' 4. add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port> - 添加 DNAT 规则\n' \
' 5. snat <host_name> <dst_ip> <dst_port> - 测试出站访问(SNAT)\n' \
' 6. dnat <host_name> <dst_ip> <dst_port> - 测试入站访问(DNAT)\n' \
' 7. show_rules - 查看 NAT 规则\n' \
' 8. show_connections - 查看连接跟踪表\n' \
' 9. show_stats - 查看统计信息\n' \
' 10. reset - 重置所有资源\n' \
' 11. exit - 退出\n' \
'==========================================================='
prompt = '(Gateway-Sim) '
def emptyline(self): pass
def do_create_gateway(self, arg):
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: create_gateway <name> <public_ip> <private_ip>")
ALL_GATEWAYS[args[0]] = NATGateway(args[0], public_ip=args[1], private_ip=args[2])
def do_create_host(self, arg):
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: create_host <name> <ip>")
ALL_HOSTS[args[0]] = Host(args[0], args[1])
def do_add_snat(self, arg):
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: add_snat <name> <src_cidr> <translated_ip>")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
rule = NATRule(name=args[0], rule_type="snat", src_cidr=args[1], translated_ip=args[2], priority=100)
gateway.add_snat_rule(rule)
print(f"✅ 成功添加 SNAT 规则: {args[1]} → {args[2]}")
def do_add_dnat(self, arg):
args = shlex.split(arg)
if len(args) != 5: return print("❌ 用法: add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port>")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
rule = NATRule(name=args[0], rule_type="dnat", dst_ip=args[1], dst_port=int(args[2]), translated_ip=args[3], translated_port=int(args[4]), priority=100)
gateway.add_dnat_rule(rule)
print(f"✅ 成功添加 DNAT 规则: {args[1]}:{args[2]} → {args[3]}:{args[4]}")
def do_snat(self, arg):
args = shlex.split(arg)
if len(args) < 3: return print("❌ 用法: snat <host_name> <dst_ip> <dst_port>")
host_name, dst_ip, dst_port = args[0], args[1], int(args[2])
host = ALL_HOSTS.get(host_name)
if not host: return print(f"❌ 主机 {host_name} 不存在")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
request = host.create_packet(dst_ip, dst_port, f"SNAT Request from {host_name}")
translated_request = gateway.process_packet(request, "outbound")
if translated_request:
print(f"\n✅ [出站] SNAT 转换成功!")
print(f" 原始请求: {host.ip}:{request.src_port} → {dst_ip}:{dst_port}")
print(f" 转换请求: {translated_request.src_ip}:{translated_request.src_port} → {translated_request.dst_ip}:{translated_request.dst_port}")
# 模拟公网服务器返回响应
response = Packet(
src_ip=dst_ip, dst_ip=translated_request.src_ip,
src_port=dst_port, dst_port=translated_request.src_port,
payload=f"Response to {host_name}"
)
print(f"\n🌍 公网服务器 {dst_ip}:{dst_port} 处理请求并返回响应...")
# NAT 网关处理入站响应
translated_response = gateway.process_packet(response, "inbound")
if translated_response:
print(f"✅ [入站] 反向 SNAT 转换成功!")
print(f" 原始响应: {response.src_ip}:{response.src_port} → {response.dst_ip}:{response.dst_port}")
print(f" 转换响应: {translated_response.src_ip}:{translated_response.src_port} → {translated_response.dst_ip}:{translated_response.dst_port}")
print(f"🎉 {host_name} 成功收到外网响应!")
else:
print(f"❌ [入站] 反向 SNAT 转换失败(无法找到连接记录)")
else:
print(f"\n❌ [出站] SNAT 转换失败")
def do_dnat(self, arg):
args = shlex.split(arg)
if len(args) < 3: return print("❌ 用法: dnat <host_name> <dst_ip> <dst_port>")
host_name, dst_ip, dst_port = args[0], args[1], int(args[2])
host = ALL_HOSTS.get(host_name)
if not host: return print(f"❌ 主机 {host_name} 不存在")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
request = host.create_packet(dst_ip, dst_port, f"DNAT Request from {host_name}")
translated_request = gateway.process_packet(request, "inbound")
if translated_request:
print(f"\n✅ [入站] DNAT 转换成功!")
print(f" 原始请求: {host.ip}:{request.src_port} → {dst_ip}:{dst_port}")
print(f" 转换请求: {translated_request.src_ip}:{translated_request.src_port} → {translated_request.dst_ip}:{translated_request.dst_port}")
# 模拟内网服务器返回响应
response = Packet(
src_ip=translated_request.dst_ip, dst_ip=translated_request.src_ip,
src_port=translated_request.dst_port, dst_port=translated_request.src_port,
payload=f"Response from internal server"
)
print(f"\n🖥️ 内网服务器 {translated_request.dst_ip}:{translated_request.dst_port} 处理请求并返回响应...")
# NAT 网关处理出站响应
translated_response = gateway.process_packet(response, "outbound")
if translated_response:
print(f"✅ [出站] 反向 DNAT 转换成功!")
print(f" 原始响应: {response.src_ip}:{response.src_port} → {response.dst_ip}:{response.dst_port}")
print(f" 转换响应: {translated_response.src_ip}:{translated_response.src_port} → {translated_response.dst_ip}:{translated_response.dst_port}")
print(f"🎉 公网用户 {host_name} 成功收到内网服务响应!")
else:
print(f"❌ [出站] 反向 DNAT 转换失败(无法找到连接记录)")
else:
print(f"\n❌ [入站] DNAT 转换失败(可能没有匹配的规则)")
def do_show_rules(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_rules()
def do_show_connections(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_connections()
def do_show_stats(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_stats()
def do_reset(self, arg):
ALL_GATEWAYS.clear(); ALL_HOSTS.clear()
print("✅ 所有资源已重置")
def do_exit(self, arg):
print("👋 再见!")
return True
if __name__ == "__main__":
GatewayShell().cmdloop()一键体验配置:
将以下指令粘贴进模拟器,可以直接体验 NAT 网关(同时包含 SNAT 和 DNAT)的核心功能:
create_gateway nat-gw 123.56.78.90 10.0.0.1
create_host web-server 192.168.1.10
create_host internal-pc 192.168.1.100
create_host public-user 203.0.113.100
add_snat internal-snat 192.168.1.0/24 123.56.78.90
add_dnat web-server 123.56.78.90 80 192.168.1.10 80
snat internal-pc 180.101.50.24 80
dnat public-user 123.56.78.90 80
show_rules
show_connections
show_stats四、完整可运行代码
将上面所有代码片段合并,保存为 nat_gateway_demo.py 即可运行。完整代码如下:
#!/usr/bin/env python3
"""
完整 NAT 网关实现
功能:SNAT + DNAT + 连接跟踪 + 高级特性
"""
import time
import threading
from dataclasses import dataclass, field
from typing import Dict, Optional, List, Set, Tuple
from enum import Enum
from collections import defaultdict
# ============================================================
# 数据结构定义
# ============================================================
class ConnectionState(Enum):
"""连接状态"""
NEW = "NEW"
ESTABLISHED = "ESTABLISHED"
RELATED = "RELATED"
CLOSED = "CLOSED"
TIME_WAIT = "TIME_WAIT"
class Protocol(Enum):
"""协议类型"""
TCP = "tcp"
UDP = "udp"
ICMP = "icmp"
@dataclass
class Packet:
"""数据包"""
src_ip: str
dst_ip: str
src_port: int
dst_port: int
protocol: str = "tcp"
payload: str = ""
timestamp: float = field(default_factory=time.time)
@property
def connection_key(self) -> str:
"""连接的唯一标识(四元组)"""
return f"{self.src_ip}:{self.src_port}-{self.dst_ip}:{self.dst_port}-{self.protocol}"
def __str__(self):
return f"[{self.protocol}] {self.src_ip}:{self.src_port} → {self.dst_ip}:{self.dst_port}"
@dataclass
class NATRule:
"""NAT 规则"""
name: str
rule_type: str # "snat" 或 "dnat"
src_cidr: str = "" # 源网段(SNAT)
dst_ip: str = "" # 目的 IP(DNAT)
dst_port: int = 0 # 目的端口(DNAT)
translated_ip: str = "" # 转换后的 IP
translated_port: int = 0 # 转换后的端口
protocol: str = "tcp"
priority: int = 100 # 优先级(数字越小优先级越高)
enabled: bool = True
def matches_snat(self, packet: Packet) -> bool:
"""检查是否匹配 SNAT 规则"""
if self.rule_type != "snat" or not self.enabled:
return False
# 简化的 CIDR 匹配(实际应该使用 ipaddress 模块)
return packet.src_ip.startswith(self.src_cidr.rsplit('.', 1)[0])
def matches_dnat(self, packet: Packet) -> bool:
"""检查是否匹配 DNAT 规则"""
if self.rule_type != "dnat" or not self.enabled:
return False
return (
packet.dst_ip == self.dst_ip and
packet.dst_port == self.dst_port and
packet.protocol == self.protocol
)
@dataclass
class ConnectionEntry:
"""连接跟踪条目"""
# 原始连接信息
original_src_ip: str
original_src_port: int
original_dst_ip: str
original_dst_port: int
protocol: str
# 转换后的连接信息
translated_src_ip: str
translated_src_port: int
translated_dst_ip: str
translated_dst_port: int
# 连接状态
state: ConnectionState = ConnectionState.NEW
nat_type: str = "" # "snat", "dnat", "both"
# 时间信息
created_at: float = field(default_factory=time.time)
last_seen: float = field(default_factory=time.time)
bytes_sent: int = 0
bytes_received: int = 0
@property
def original_key(self) -> str:
"""原始连接的唯一标识(请求方向)"""
return f"{self.original_src_ip}:{self.original_src_port}-{self.original_dst_ip}:{self.original_dst_port}-{self.protocol}"
@property
def reply_key(self) -> str:
"""响应连接的唯一标识(期望收到的响应包四元组)"""
return f"{self.translated_dst_ip}:{self.translated_dst_port}-{self.translated_src_ip}:{self.translated_src_port}-{self.protocol}"
def update_activity(self):
"""更新活动时间"""
self.last_seen = time.time()
self.state = ConnectionState.ESTABLISHED
def is_expired(self, timeout: int = 300) -> bool:
"""检查连接是否过期"""
return time.time() - self.last_seen > timeout
# ============================================================
# 连接跟踪管理器
# ============================================================
class ConnectionTracker:
"""连接跟踪管理器"""
def __init__(self, timeout: int = 300):
"""
初始化连接跟踪管理器
Args:
timeout: 连接超时时间(秒)
"""
self.connections: Dict[str, ConnectionEntry] = {}
self.reverse_connections: Dict[str, str] = {}
self.timeout = timeout
self.lock = threading.Lock()
# 统计信息
self.stats = {
'total_connections': 0,
'active_connections': 0,
'expired_connections': 0
}
# 启动清理线程
self._start_cleanup_thread()
def _start_cleanup_thread(self):
"""启动过期连接清理线程"""
def cleanup_loop():
while True:
time.sleep(60) # 每分钟清理一次
self.cleanup_expired()
thread = threading.Thread(target=cleanup_loop, daemon=True)
thread.start()
def add_connection(self, entry: ConnectionEntry):
"""添加连接"""
with self.lock:
self.connections[entry.original_key] = entry
self.reverse_connections[entry.reply_key] = entry.original_key
self.stats['total_connections'] += 1
self.stats['active_connections'] += 1
def find_by_original(self, key: str) -> Optional[ConnectionEntry]:
"""通过原始连接查找"""
with self.lock:
entry = self.connections.get(key)
if entry:
entry.update_activity()
return entry
def find_by_reply(self, key: str) -> Optional[ConnectionEntry]:
"""通过响应连接查找"""
with self.lock:
original_key = self.reverse_connections.get(key)
if original_key:
entry = self.connections.get(original_key)
if entry:
entry.update_activity()
return entry
return None
def cleanup_expired(self):
"""清理过期连接"""
with self.lock:
expired_keys = []
for key, entry in self.connections.items():
if entry.is_expired(self.timeout):
expired_keys.append(key)
for key in expired_keys:
entry = self.connections[key]
del self.connections[key]
if entry.translated_key in self.reverse_connections:
del self.reverse_connections[entry.translated_key]
self.stats['expired_connections'] += 1
self.stats['active_connections'] -= 1
if expired_keys:
print(f"🧹 清理了 {len(expired_keys)} 个过期连接")
def get_all_connections(self) -> List[ConnectionEntry]:
"""获取所有连接"""
with self.lock:
return list(self.connections.values())
# ============================================================
# NAT 网关实现
# ============================================================
class NATGateway:
"""
完整 NAT 网关实现
功能:SNAT + DNAT + 连接跟踪
"""
def __init__(self, name: str, public_ip: str, private_ip: str):
"""
初始化 NAT 网关
Args:
name: 网关名称
public_ip: 公网 IP
private_ip: 内网 IP
"""
self.name = name
self.public_ip = public_ip
self.private_ip = private_ip
# 连接跟踪管理器
self.conntrack = ConnectionTracker(timeout=300)
# NAT 规则
self.snat_rules: List[NATRule] = []
self.dnat_rules: List[NATRule] = []
# 端口分配器
self.port_allocator = PortAllocator(start=10000, end=60000)
# 统计信息
self.stats = {
'packets_translated': 0,
'packets_dropped': 0,
'snat_translations': 0,
'dnat_translations': 0
}
print(f"✅ NAT 网关 [{name}] 创建成功")
print(f" 公网 IP: {public_ip}")
print(f" 内网 IP: {private_ip}")
def add_snat_rule(self, rule: NATRule):
"""添加 SNAT 规则"""
rule.rule_type = "snat"
self.snat_rules.append(rule)
self.snat_rules.sort(key=lambda r: r.priority)
print(f"✅ 添加 SNAT 规则: {rule.name}")
print(f" {rule.src_cidr} → {rule.translated_ip}")
def add_dnat_rule(self, rule: NATRule):
"""添加 DNAT 规则"""
rule.rule_type = "dnat"
self.dnat_rules.append(rule)
self.dnat_rules.sort(key=lambda r: r.priority)
print(f"✅ 添加 DNAT 规则: {rule.name}")
print(f" {rule.dst_ip}:{rule.dst_port} → {rule.translated_ip}:{rule.translated_port}")
def process_packet(self, packet: Packet, direction: str) -> Optional[Packet]:
"""
处理数据包
Args:
packet: 原始数据包
direction: 方向 ("inbound" 或 "outbound")
Returns:
转换后的数据包
"""
if direction == "inbound":
return self._process_inbound(packet)
else:
return self._process_outbound(packet)
def _process_inbound(self, packet: Packet) -> Optional[Packet]:
"""处理入站数据包(公网 → 内网)"""
print(f"\n📥 处理入站数据包: {packet}")
# 1. 检查是否已有连接
conn_key = f"{packet.src_ip}:{packet.src_port}-{packet.dst_ip}:{packet.dst_port}-{packet.protocol}"
# 可能是已有 DNAT 连接的后续请求
existing_conn = self.conntrack.find_by_original(conn_key)
# 也可能是已有 SNAT 连接的返回响应
if not existing_conn:
existing_conn = self.conntrack.find_by_reply(conn_key)
if existing_conn:
# 已有连接,直接使用
print(f" ✅ 命中已有连接 ({existing_conn.nat_type})")
if conn_key == existing_conn.original_key:
# 这是 DNAT 的原方向报文
translated = Packet(
src_ip=packet.src_ip,
dst_ip=existing_conn.translated_dst_ip,
src_port=packet.src_port,
dst_port=existing_conn.translated_dst_port,
protocol=packet.protocol,
payload=packet.payload
)
else:
# 这是 SNAT 的响应报文
translated = Packet(
src_ip=packet.src_ip,
dst_ip=existing_conn.original_src_ip,
src_port=packet.src_port,
dst_port=existing_conn.original_src_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
return translated
# 2. 查找匹配的 DNAT 规则
for rule in self.dnat_rules:
if rule.matches_dnat(packet):
print(f" ✅ 匹配 DNAT 规则: {rule.name}")
# 创建连接条目
entry = ConnectionEntry(
original_src_ip=packet.src_ip,
original_src_port=packet.src_port,
original_dst_ip=packet.dst_ip,
original_dst_port=packet.dst_port,
protocol=packet.protocol,
translated_src_ip=packet.src_ip,
translated_src_port=packet.src_port,
translated_dst_ip=rule.translated_ip,
translated_dst_port=rule.translated_port,
nat_type="dnat"
)
self.conntrack.add_connection(entry)
# 创建转换后的数据包
translated = Packet(
src_ip=packet.src_ip,
dst_ip=rule.translated_ip,
src_port=packet.src_port,
dst_port=rule.translated_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
self.stats['dnat_translations'] += 1
return translated
print(f" ❌ 没有匹配的规则,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
def _process_outbound(self, packet: Packet) -> Optional[Packet]:
"""处理出站数据包(内网 → 公网)"""
print(f"\n📤 处理出站数据包: {packet}")
# 1. 检查是否已有连接
conn_key = f"{packet.src_ip}:{packet.src_port}-{packet.dst_ip}:{packet.dst_port}-{packet.protocol}"
# 可能是已有 SNAT 连接的后续请求
existing_conn = self.conntrack.find_by_original(conn_key)
# 也可能是已有 DNAT 连接的返回响应
if not existing_conn:
existing_conn = self.conntrack.find_by_reply(conn_key)
if existing_conn:
# 已有连接,直接使用
print(f" ✅ 命中已有连接 ({existing_conn.nat_type})")
if conn_key == existing_conn.original_key:
# 这是 SNAT 的原方向报文
translated = Packet(
src_ip=existing_conn.translated_src_ip,
dst_ip=packet.dst_ip,
src_port=existing_conn.translated_src_port,
dst_port=packet.dst_port,
protocol=packet.protocol,
payload=packet.payload
)
else:
# 这是 DNAT 的响应报文
translated = Packet(
src_ip=existing_conn.original_dst_ip,
dst_ip=packet.dst_ip,
src_port=existing_conn.original_dst_port,
dst_port=packet.dst_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
return translated
# 2. 查找匹配的 SNAT 规则
for rule in self.snat_rules:
if rule.matches_snat(packet):
print(f" ✅ 匹配 SNAT 规则: {rule.name}")
# 分配端口
translated_port = self.port_allocator.allocate()
if translated_port is None:
print(f" ❌ 端口耗尽,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
# 创建连接条目
entry = ConnectionEntry(
original_src_ip=packet.src_ip,
original_src_port=packet.src_port,
original_dst_ip=packet.dst_ip,
original_dst_port=packet.dst_port,
protocol=packet.protocol,
translated_src_ip=rule.translated_ip,
translated_src_port=translated_port,
translated_dst_ip=packet.dst_ip,
translated_dst_port=packet.dst_port,
nat_type="snat"
)
self.conntrack.add_connection(entry)
# 创建转换后的数据包
translated = Packet(
src_ip=rule.translated_ip,
dst_ip=packet.dst_ip,
src_port=translated_port,
dst_port=packet.dst_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
self.stats['snat_translations'] += 1
return translated
# 3. 如果没有 SNAT 规则,直接转发
print(f" ⚠️ 没有匹配的 SNAT 规则,直接转发")
return packet
def show_rules(self):
"""显示所有规则"""
print(f"\n{'='*70}")
print(f"📋 NAT 规则列表 - {self.name}")
print(f"{'='*70}")
print(f"\n--- SNAT 规则 ---")
if not self.snat_rules:
print(" (空)")
else:
print(f"{'名称':<15} {'源网段':<20} {'转换IP':<20} {'优先级':<8} {'状态':<8}")
print(f"{'-'*15} {'-'*20} {'-'*20} {'-'*8} {'-'*8}")
for rule in self.snat_rules:
status = "启用" if rule.enabled else "禁用"
print(f"{rule.name:<15} {rule.src_cidr:<20} {rule.translated_ip:<20} {rule.priority:<8} {status:<8}")
print(f"\n--- DNAT 规则 ---")
if not self.dnat_rules:
print(" (空)")
else:
print(f"{'名称':<15} {'目的地址':<25} {'转换地址':<25} {'优先级':<8} {'状态':<8}")
print(f"{'-'*15} {'-'*25} {'-'*25} {'-'*8} {'-'*8}")
for rule in self.dnat_rules:
dst = f"{rule.dst_ip}:{rule.dst_port}"
translated = f"{rule.translated_ip}:{rule.translated_port}"
status = "启用" if rule.enabled else "禁用"
print(f"{rule.name:<15} {dst:<25} {translated:<25} {rule.priority:<8} {status:<8}")
def show_connections(self):
"""显示连接跟踪表"""
print(f"\n{'='*70}")
print(f"📊 连接跟踪表 - {self.name}")
print(f"{'='*70}")
connections = self.conntrack.get_all_connections()
if not connections:
print(" (空)")
return
print(f"{'源地址':<20} {'原始目的':<20} {'转换后目的':<20} {'类型':<8} {'状态':<12}")
print(f"{'-'*20} {'-'*20} {'-'*20} {'-'*8} {'-'*12}")
for conn in connections[:20]: # 只显示前 20 条
src = f"{conn.original_src_ip}:{conn.original_src_port}"
original_dst = f"{conn.original_dst_ip}:{conn.original_dst_port}"
translated_dst = f"{conn.translated_dst_ip}:{conn.translated_dst_port}"
print(f"{src:<20} {original_dst:<20} {translated_dst:<20} {conn.nat_type:<8} {conn.state.value:<12}")
if len(connections) > 20:
print(f" ... 还有 {len(connections) - 20} 条连接")
def show_stats(self):
"""显示统计信息"""
print(f"\n{'='*70}")
print(f"📈 统计信息 - {self.name}")
print(f"{'='*70}")
print(f" 已转换数据包: {self.stats['packets_translated']}")
print(f" 已丢弃数据包: {self.stats['packets_dropped']}")
print(f" SNAT 转换次数: {self.stats['snat_translations']}")
print(f" DNAT 转换次数: {self.stats['dnat_translations']}")
print(f" 活跃连接数: {self.conntrack.stats['active_connections']}")
print(f" 历史连接总数: {self.conntrack.stats['total_connections']}")
# ============================================================
# 端口分配器
# ============================================================
class PortAllocator:
"""端口分配器"""
def __init__(self, start: int = 10000, end: int = 60000):
self.start = start
self.end = end
self.used_ports: Set[int] = set()
def allocate(self) -> Optional[int]:
"""分配一个可用端口"""
for port in range(self.start, self.end + 1):
if port not in self.used_ports:
self.used_ports.add(port)
return port
return None
def release(self, port: int):
"""释放一个端口"""
self.used_ports.discard(port)
# ============================================================
# 模拟网络设备
# ============================================================
class Host:
"""模拟网络主机"""
def __init__(self, name: str, ip: str, gateway: str = None):
self.name = name
self.ip = ip
self.gateway = gateway
self.port_counter = 10000
print(f"✅ 主机 [{name}] 创建成功,IP: {ip}")
def create_packet(self, dst_ip: str, dst_port: int, payload: str = "") -> Packet:
"""创建一个数据包"""
self.port_counter += 1
return Packet(
src_ip=self.ip,
dst_ip=dst_ip,
src_port=self.port_counter,
dst_port=dst_port,
payload=payload
)
def receive_packet(self, packet: Packet):
"""接收一个数据包"""
print(f" 📥 [{self.name}] 收到数据包: {packet}")
if packet.payload:
print(f" 负载: {packet.payload}")
# ============================================================
# 交互式 CLI Shell
# ============================================================
import cmd
import shlex
# 全局资源
ALL_GATEWAYS: Dict[str, NATGateway] = {}
ALL_HOSTS: Dict[str, Host] = {}
class GatewayShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 NAT Gateway 交互式模拟器\n' \
'【可用指令】:\n' \
' 1. create_gateway <name> <public_ip> <private_ip> - 创建 NAT 网关\n' \
' 2. create_host <name> <ip> - 创建主机\n' \
' 3. add_snat <name> <src_cidr> <translated_ip> - 添加 SNAT 规则\n' \
' 4. add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port> - 添加 DNAT 规则\n' \
' 5. snat <host_name> <dst_ip> <dst_port> - 测试出站访问(SNAT)\n' \
' 6. dnat <host_name> <dst_ip> <dst_port> - 测试入站访问(DNAT)\n' \
' 7. show_rules - 查看 NAT 规则\n' \
' 8. show_connections - 查看连接跟踪表\n' \
' 9. show_stats - 查看统计信息\n' \
' 10. reset - 重置所有资源\n' \
' 11. exit - 退出\n' \
'==========================================================='
prompt = '(Gateway-Sim) '
def emptyline(self): pass
def do_create_gateway(self, arg):
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: create_gateway <name> <public_ip> <private_ip>")
ALL_GATEWAYS[args[0]] = NATGateway(args[0], public_ip=args[1], private_ip=args[2])
def do_create_host(self, arg):
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: create_host <name> <ip>")
ALL_HOSTS[args[0]] = Host(args[0], args[1])
def do_add_snat(self, arg):
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: add_snat <name> <src_cidr> <translated_ip>")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
rule = NATRule(name=args[0], rule_type="snat", src_cidr=args[1], translated_ip=args[2], priority=100)
gateway.add_snat_rule(rule)
print(f"✅ 成功添加 SNAT 规则: {args[1]} → {args[2]}")
def do_add_dnat(self, arg):
args = shlex.split(arg)
if len(args) != 5: return print("❌ 用法: add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port>")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
rule = NATRule(name=args[0], rule_type="dnat", dst_ip=args[1], dst_port=int(args[2]), translated_ip=args[3], translated_port=int(args[4]), priority=100)
gateway.add_dnat_rule(rule)
print(f"✅ 成功添加 DNAT 规则: {args[1]}:{args[2]} → {args[3]}:{args[4]}")
def do_snat(self, arg):
args = shlex.split(arg)
if len(args) < 3: return print("❌ 用法: snat <host_name> <dst_ip> <dst_port>")
host_name, dst_ip, dst_port = args[0], args[1], int(args[2])
host = ALL_HOSTS.get(host_name)
if not host: return print(f"❌ 主机 {host_name} 不存在")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
request = host.create_packet(dst_ip, dst_port, f"SNAT Request from {host_name}")
translated_request = gateway.process_packet(request, "outbound")
if translated_request:
print(f"\n✅ [出站] SNAT 转换成功!")
print(f" 原始请求: {host.ip}:{request.src_port} → {dst_ip}:{dst_port}")
print(f" 转换请求: {translated_request.src_ip}:{translated_request.src_port} → {translated_request.dst_ip}:{translated_request.dst_port}")
# 模拟公网服务器返回响应
response = Packet(
src_ip=dst_ip, dst_ip=translated_request.src_ip,
src_port=dst_port, dst_port=translated_request.src_port,
payload=f"Response to {host_name}"
)
print(f"\n🌍 公网服务器 {dst_ip}:{dst_port} 处理请求并返回响应...")
# NAT 网关处理入站响应
translated_response = gateway.process_packet(response, "inbound")
if translated_response:
print(f"✅ [入站] 反向 SNAT 转换成功!")
print(f" 原始响应: {response.src_ip}:{response.src_port} → {response.dst_ip}:{response.dst_port}")
print(f" 转换响应: {translated_response.src_ip}:{translated_response.src_port} → {translated_response.dst_ip}:{translated_response.dst_port}")
print(f"🎉 {host_name} 成功收到外网响应!")
else:
print(f"❌ [入站] 反向 SNAT 转换失败(无法找到连接记录)")
else:
print(f"\n❌ [出站] SNAT 转换失败")
def do_dnat(self, arg):
args = shlex.split(arg)
if len(args) < 3: return print("❌ 用法: dnat <host_name> <dst_ip> <dst_port>")
host_name, dst_ip, dst_port = args[0], args[1], int(args[2])
host = ALL_HOSTS.get(host_name)
if not host: return print(f"❌ 主机 {host_name} 不存在")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
request = host.create_packet(dst_ip, dst_port, f"DNAT Request from {host_name}")
translated_request = gateway.process_packet(request, "inbound")
if translated_request:
print(f"\n✅ [入站] DNAT 转换成功!")
print(f" 原始请求: {host.ip}:{request.src_port} → {dst_ip}:{dst_port}")
print(f" 转换请求: {translated_request.src_ip}:{translated_request.src_port} → {translated_request.dst_ip}:{translated_request.dst_port}")
# 模拟内网服务器返回响应
response = Packet(
src_ip=translated_request.dst_ip, dst_ip=translated_request.src_ip,
src_port=translated_request.dst_port, dst_port=translated_request.src_port,
payload=f"Response from internal server"
)
print(f"\n🖥️ 内网服务器 {translated_request.dst_ip}:{translated_request.dst_port} 处理请求并返回响应...")
# NAT 网关处理出站响应
translated_response = gateway.process_packet(response, "outbound")
if translated_response:
print(f"✅ [出站] 反向 DNAT 转换成功!")
print(f" 原始响应: {response.src_ip}:{response.src_port} → {response.dst_ip}:{response.dst_port}")
print(f" 转换响应: {translated_response.src_ip}:{translated_response.src_port} → {translated_response.dst_ip}:{translated_response.dst_port}")
print(f"🎉 公网用户 {host_name} 成功收到内网服务响应!")
else:
print(f"❌ [出站] 反向 DNAT 转换失败(无法找到连接记录)")
else:
print(f"\n❌ [入站] DNAT 转换失败(可能没有匹配的规则)")
def do_show_rules(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_rules()
def do_show_connections(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_connections()
def do_show_stats(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_stats()
def do_reset(self, arg):
ALL_GATEWAYS.clear(); ALL_HOSTS.clear()
print("✅ 所有资源已重置")
def do_exit(self, arg):
print("👋 再见!")
return True
if __name__ == "__main__":
GatewayShell().cmdloop()五、运行方法
- 将代码保存为
nat_part3_advance.py - 执行:
python3 nat_part3_advance.py- 运行结果
root@ubuntu:~# python3 nat_part3_advance.py
===========================================================
欢迎使用 NAT Gateway 交互式模拟器
【可用指令】:
1. create_gateway <name> <public_ip> <private_ip> - 创建 NAT 网关
2. create_host <name> <ip> - 创建主机
3. add_snat <name> <src_cidr> <translated_ip> - 添加 SNAT 规则
4. add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port> - 添加 DNAT 规则
5. snat <host_name> <dst_ip> <dst_port> - 测试出站访问(SNAT)
6. dnat <host_name> <dst_ip> <dst_port> - 测试入站访问(DNAT)
7. show_rules - 查看 NAT 规则
8. show_connections - 查看连接跟踪表
9. show_stats - 查看统计信息
10. reset - 重置所有资源
11. exit - 退出
===========================================================
(Gateway-Sim) create_gateway nat-gw 123.56.78.90 10.0.0.1
✅ NAT 网关 [nat-gw] 创建成功
公网 IP: 123.56.78.90
内网 IP: 10.0.0.1
(Gateway-Sim) create_host web-server 192.168.1.10
✅ 主机 [web-server] 创建成功,IP: 192.168.1.10
(Gateway-Sim) create_host internal-pc 192.168.1.100
✅ 主机 [internal-pc] 创建成功,IP: 192.168.1.100
(Gateway-Sim) create_host public-user 203.0.113.100
✅ 主机 [public-user] 创建成功,IP: 203.0.113.100
(Gateway-Sim) add_snat internal-snat 192.168.1.0/24 123.56.78.90
✅ 添加 SNAT 规则: internal-snat
192.168.1.0/24 → 123.56.78.90
✅ 成功添加 SNAT 规则: 192.168.1.0/24 → 123.56.78.90
(Gateway-Sim) add_dnat web-server 123.56.78.90 80 192.168.1.10 80
✅ 添加 DNAT 规则: web-server
123.56.78.90:80 → 192.168.1.10:80
✅ 成功添加 DNAT 规则: 123.56.78.90:80 → 192.168.1.10:80
(Gateway-Sim) snat internal-pc 180.101.50.24 80
📤 处理出站数据包: [tcp] 192.168.1.100:10001 → 180.101.50.24:80
✅ 匹配 SNAT 规则: internal-snat
✅ [出站] SNAT 转换成功!
原始请求: 192.168.1.100:10001 → 180.101.50.24:80
转换请求: 123.56.78.90:10000 → 180.101.50.24:80
🌍 公网服务器 180.101.50.24:80 处理请求并返回响应...
📥 处理入站数据包: [tcp] 180.101.50.24:80 → 123.56.78.90:10000
✅ 命中已有连接 (snat)
✅ [入站] 反向 SNAT 转换成功!
原始响应: 180.101.50.24:80 → 123.56.78.90:10000
转换响应: 180.101.50.24:80 → 192.168.1.100:10001
🎉 internal-pc 成功收到外网响应!
(Gateway-Sim) dnat public-user 123.56.78.90 80
📥 处理入站数据包: [tcp] 203.0.113.100:10001 → 123.56.78.90:80
✅ 匹配 DNAT 规则: web-server
✅ [入站] DNAT 转换成功!
原始请求: 203.0.113.100:10001 → 123.56.78.90:80
转换请求: 203.0.113.100:10001 → 192.168.1.10:80
🖥️ 内网服务器 192.168.1.10:80 处理请求并返回响应...
📤 处理出站数据包: [tcp] 192.168.1.10:80 → 203.0.113.100:10001
✅ 命中已有连接 (dnat)
✅ [出站] 反向 DNAT 转换成功!
原始响应: 192.168.1.10:80 → 203.0.113.100:10001
转换响应: 123.56.78.90:80 → 203.0.113.100:10001
🎉 公网用户 public-user 成功收到内网服务响应!
(Gateway-Sim) show_rules
======================================================================
📋 NAT 规则列表 - nat-gw
======================================================================
--- SNAT 规则 ---
名称 源网段 转换IP 优先级 状态
--------------- -------------------- -------------------- -------- --------
internal-snat 192.168.1.0/24 123.56.78.90 100 启用
--- DNAT 规则 ---
名称 目的地址 转换地址 优先级 状态
--------------- ------------------------- ------------------------- -------- --------
web-server 123.56.78.90:80 192.168.1.10:80 100 启用
(Gateway-Sim) show_connections
======================================================================
📊 连接跟踪表 - nat-gw
======================================================================
源地址 原始目的 转换后目的 类型 状态
-------------------- -------------------- -------------------- -------- ------------
192.168.1.100:10001 180.101.50.24:80 180.101.50.24:80 snat ESTABLISHED
203.0.113.100:10001 123.56.78.90:80 192.168.1.10:80 dnat ESTABLISHED
(Gateway-Sim) show_stats
======================================================================
📈 统计信息 - nat-gw
======================================================================
已转换数据包: 4
已丢弃数据包: 0
SNAT 转换次数: 1
DNAT 转换次数: 1
活跃连接数: 2
历史连接总数: 2六、交互式演示
下面是一个交互式演示,帮助你直观理解完整 NAT 网关的工作原理:
🎮 动手试试:完整 NAT 网关演示
体验 SNAT + DNAT + 连接跟踪的完整工作流程
演示说明:
- 点击 「SNAT: 内网 → 公网」 按钮,观察内网主机如何访问公网
- 点击 「DNAT: 公网 → Web」 按钮,观察公网用户如何访问内网 Web 服务
- 点击 「DNAT: 公网 → DB」 按钮,观察公网用户如何访问内网数据库
- 点击 「完整流程」 按钮,理解 SNAT + DNAT + 连接跟踪的完整工作流程
- 注意观察连接跟踪表的变化
七、下一篇预告
在下一篇中,我们将学习:
- NAT 实战场景与故障排查
- 常见 NAT 问题及解决方案
- NAT 性能优化
- 云厂商 NAT 网关的最佳实践
