第二篇:DNAT 原理与端口映射实现
运行前提条件
- 系统:Linux 系统(Ubuntu 20.04+ / Debian 11+ / CentOS 8+ 均可),Windows 用户用 WSL2 也可以
- 权限:必须有 sudo /root 权限(操作内核网络模块需要)
- 环境:Python 3.8+,不需要装任何第三方库,纯标准库就能跑
- 基础:懂一点 Linux 基础命令和 Python 语法就行,零基础跟着步骤也能跑通
一、学习目标
- 理解 DNAT(Destination NAT) 的核心原理
- 掌握端口映射的工作机制
- 用 Python 代码实现完整的 DNAT 功能
- 理解 DNAT 与 SNAT 的区别和联系
二、DNAT 核心原理
2.1 什么是 DNAT?
DNAT(Destination Network Address Translation),即目的地址转换,是一种修改数据包目的 IP 地址的技术。
与 SNAT 的对比
| 特性 | SNAT | DNAT |
|---|---|---|
| 修改对象 | 源 IP 地址 | 目的 IP 地址 |
| 流量方向 | 内网 → 公网 | 公网 → 内网 |
| 主要用途 | 让内网能上网 | 让公网能访问内网服务 |
| 连接发起方 | 内网主机 | 公网用户 |
生活中的例子
想象你开了一家公司:
- 公司总机号:400-123-4567(公网 IP)
- 内部分机:101(Web 服务器)、102(数据库)、103(邮件服务器)
客户拨打公司总机,然后根据语音提示转接到对应的分机。
这就是 DNAT 的核心思想:用一个公网 IP 的不同端口,映射到不同的内网服务。
2.2 为什么需要 DNAT?
场景 1:发布内网服务
你有一个 Web 服务器在内网(192.168.1.100),想让公网用户访问。
问题:内网 IP 在公网上不可路由,公网用户无法直接访问。
解决方案:通过 DNAT 将公网 IP 的 80 端口映射到内网服务器的 80 端口。
场景 2:端口复用
你只有一个公网 IP,但有多个内网服务需要对外暴露。
解决方案:通过不同端口映射到不同服务。
场景 3:安全防护
直接将内网服务器暴露在公网上,会面临各种攻击。
解决方案:通过 DNAT + 安全组/ACL,在 NAT 网关层面做访问控制。
2.3 DNAT 工作流程
DNAT 修改的是数据包的目的 IP 地址,方向是公网 → 内网。
完整流程图
2.4 iptables 中的 DNAT 实现
在 Linux 中,DNAT 通过 iptables 的 nat 表实现:
# 语法
iptables -t nat -A PREROUTING -p <协议> --dport <公网端口> -j DNAT --to-destination <内网IP>:<内网端口>
# 示例:将公网 80 端口映射到内网 192.168.1.10:80
iptables -t nat -A PREROUTING -p tcp --dport 80 -j DNAT --to-destination 192.168.1.10:80
# 示例:将公网 8080 端口映射到内网 192.168.1.10:80(端口转换)
iptables -t nat -A PREROUTING -p tcp --dport 8080 -j DNAT --to-destination 192.168.1.10:80关键参数解析
| 参数 | 含义 |
|---|---|
-t nat | 操作 nat 表 |
-A PREROUTING | 在数据包进入本机时处理(修改目的 IP) |
-p tcp | 匹配 TCP 协议 |
--dport 80 | 匹配目的端口为 80 的数据包 |
-j DNAT | 执行 DNAT 转换 |
--to-destination 192.168.1.10:80 | 转换后的目的地址和端口 |
三、代码分段解析
现在我们用 Python 代码实现一个完整的 DNAT 功能。我们将模拟以下场景:
3.1 数据结构定义
做什么:定义数据包、DNAT 规则、转换表条目等核心数据结构。
为什么这么做:与 SNAT 类似,清晰的数据结构是实现 NAT 网关的基础。DNAT 特别需要记录规则,以便动态匹配入站连接。
import time
from dataclasses import dataclass, field
from typing import Dict, Optional, List, Tuple
from enum import Enum
# ============================================================
# 数据结构定义
# ============================================================
class ConnectionState(Enum):
"""连接状态"""
NEW = "NEW"
ESTABLISHED = "ESTABLISHED"
RELATED = "RELATED"
CLOSED = "CLOSED"
@dataclass
class Packet:
"""数据包"""
src_ip: str
dst_ip: str
src_port: int
dst_port: int
protocol: str = "tcp"
payload: str = ""
def __str__(self):
return f"[{self.protocol}] {self.src_ip}:{self.src_port} → {self.dst_ip}:{self.dst_port}"
@dataclass
class DNATRule:
"""DNAT 规则"""
name: str # 规则名称
public_ip: str # 公网 IP
public_port: int # 公网端口
private_ip: str # 内网 IP
private_port: int # 内网端口
protocol: str = "tcp" # 协议
enabled: bool = True # 是否启用
def matches(self, packet: Packet) -> bool:
"""检查数据包是否匹配此规则"""
return (
packet.dst_ip == self.public_ip and
packet.dst_port == self.public_port and
packet.protocol == self.protocol and
self.enabled
)
@dataclass
class DNATEntry:
"""DNAT 转换表条目"""
original_dst_ip: str # 原始目的 IP
original_dst_port: int # 原始目的端口
translated_dst_ip: str # 转换后的目的 IP
translated_dst_port: int # 转换后的目的端口
src_ip: str # 源 IP
src_port: int # 源端口
state: ConnectionState = ConnectionState.NEW
created_at: float = field(default_factory=time.time)
last_seen: float = field(default_factory=time.time)
@property
def original_key(self) -> str:
return f"{self.src_ip}:{self.src_port}-{self.original_dst_ip}:{self.original_dst_port}"
@property
def translated_key(self) -> str:
return f"{self.src_ip}:{self.src_port}-{self.translated_dst_ip}:{self.translated_dst_port}"关键要点:
DNATRule定义了端口映射的规则,比如将公网 80 端口映射到内网 80 端口。DNATEntry用于连接跟踪,记录连接的状态和转换关系,以便处理后续的数据包和回程响应。
3.2 DNAT 网关核心实现
做什么:实现 DNAT 网关的核心逻辑,包含入站和出站数据包的处理方法。
为什么这么做:这是 DNAT 的灵魂,主要处理公网用户主动发起的连接请求。
# ============================================================
# DNAT 网关实现
# ============================================================
class DNATGateway:
"""
DNAT 网关实现
功能:将公网 IP:端口 映射到内网 IP:端口
"""
def __init__(self, name: str, public_ip: str, private_ip: str):
"""
初始化 DNAT 网关
Args:
name: 网关名称
public_ip: 公网 IP
private_ip: 内网 IP(连接内网的接口)
"""
self.name = name
self.public_ip = public_ip
self.private_ip = private_ip
# DNAT 规则列表
self.dnat_rules: List[DNATRule] = []
# DNAT 转换表
self.dnat_table: Dict[str, DNATEntry] = {}
# 反向映射
self.reverse_table: Dict[str, str] = {}
# 统计信息
self.stats = {
'packets_translated': 0,
'packets_dropped': 0,
'active_connections': 0,
'rules_matched': 0
}
print(f"✅ DNAT 网关 [{name}] 创建成功")
print(f" 公网 IP: {public_ip}")
print(f" 内网 IP: {private_ip}")
def add_rule(self, rule: DNATRule):
"""添加 DNAT 规则"""
self.dnat_rules.append(rule)
print(f"✅ 添加 DNAT 规则: {rule.name}")
print(f" {rule.public_ip}:{rule.public_port} → {rule.private_ip}:{rule.private_port}")
def remove_rule(self, rule_name: str):
"""移除 DNAT 规则"""
self.dnat_rules = [r for r in self.dnat_rules if r.name != rule_name]
print(f"✅ 移除 DNAT 规则: {rule_name}")
def find_matching_rule(self, packet: Packet) -> Optional[DNATRule]:
"""查找匹配的 DNAT 规则"""
for rule in self.dnat_rules:
if rule.matches(packet):
return rule
return None
def translate_inbound(self, packet: Packet) -> Optional[Packet]:
"""
处理入站数据包(公网 → 内网)
Args:
packet: 来自公网的数据包
Returns:
转换后的数据包,如果没有匹配的规则则返回 None
"""
print(f"\n📥 处理入站数据包: {packet}")
# 查找匹配的 DNAT 规则
rule = self.find_matching_rule(packet)
if not rule:
print(f" ❌ 没有匹配的 DNAT 规则,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
self.stats['rules_matched'] += 1
print(f" ✅ 匹配规则: {rule.name}")
# 检查是否已有连接条目
entry_key = f"{packet.src_ip}:{packet.src_port}-{packet.dst_ip}:{packet.dst_port}"
if entry_key in self.dnat_table:
# 已有条目,更新状态
entry = self.dnat_table[entry_key]
entry.last_seen = time.time()
entry.state = ConnectionState.ESTABLISHED
print(f" ✅ 命中已有连接条目")
else:
# 新建条目
entry = DNATEntry(
original_dst_ip=packet.dst_ip,
original_dst_port=packet.dst_port,
translated_dst_ip=rule.private_ip,
translated_dst_port=rule.private_port,
src_ip=packet.src_ip,
src_port=packet.src_port,
state=ConnectionState.NEW
)
self.dnat_table[entry_key] = entry
self.reverse_table[entry.translated_key] = entry_key
self.stats['active_connections'] += 1
print(f" ✅ 新建连接条目:")
print(f" {packet.dst_ip}:{packet.dst_port} → {rule.private_ip}:{rule.private_port}")
# 创建转换后的数据包
translated_packet = Packet(
src_ip=packet.src_ip,
dst_ip=rule.private_ip,
src_port=packet.src_port,
dst_port=rule.private_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
print(f" 📦 转换后: {translated_packet}")
return translated_packet
def translate_outbound(self, packet: Packet) -> Optional[Packet]:
"""
处理出站数据包(内网 → 公网)
Args:
packet: 来自内网的数据包
Returns:
转换后的数据包,如果没有匹配的连接则返回 None
"""
print(f"\n📤 处理出站数据包: {packet}")
# 查找反向映射
reverse_key = f"{packet.dst_ip}:{packet.dst_port}-{packet.src_ip}:{packet.src_port}"
if reverse_key not in self.reverse_table:
# 没有匹配的连接,可能是内网主动发起的连接
# 这种情况应该由 SNAT 处理,不是 DNAT 的职责
print(f" ⚠️ 没有匹配的 DNAT 连接,跳过")
return packet
# 获取原始连接信息
original_key = self.reverse_table[reverse_key]
entry = self.dnat_table[original_key]
# 更新状态
entry.last_seen = time.time()
entry.state = ConnectionState.ESTABLISHED
# 创建转换后的数据包
translated_packet = Packet(
src_ip=self.public_ip,
dst_ip=packet.dst_ip,
src_port=packet.src_port,
dst_port=packet.dst_port,
protocol=packet.protocol,
payload=packet.payload
)
print(f" ✅ 反向 DNAT 转换:")
print(f" 源IP: {packet.src_ip} → {self.public_ip}")
return translated_packet
def show_rules(self):
"""显示所有 DNAT 规则"""
print(f"\n{'='*60}")
print(f"📋 DNAT 规则列表 - {self.name}")
print(f"{'='*60}")
if not self.dnat_rules:
print(" (空)")
return
print(f"{'规则名称':<15} {'公网地址':<20} {'内网地址':<20} {'状态':<8}")
print(f"{'-'*15} {'-'*20} {'-'*20} {'-'*8}")
for rule in self.dnat_rules:
public = f"{rule.public_ip}:{rule.public_port}"
private = f"{rule.private_ip}:{rule.private_port}"
status = "启用" if rule.enabled else "禁用"
print(f"{rule.name:<15} {public:<20} {private:<20} {status:<8}")
def show_connections(self):
"""显示连接表"""
print(f"\n{'='*60}")
print(f"📊 DNAT 连接表 - {self.name}")
print(f"{'='*60}")
if not self.dnat_table:
print(" (空)")
return
print(f"{'源地址':<20} {'原始目的地址':<20} {'转换后目的地址':<20} {'状态':<12}")
print(f"{'-'*20} {'-'*20} {'-'*20} {'-'*12}")
for entry in self.dnat_table.values():
src = f"{entry.src_ip}:{entry.src_port}"
original = f"{entry.original_dst_ip}:{entry.original_dst_port}"
translated = f"{entry.translated_dst_ip}:{entry.translated_dst_port}"
state = entry.state.value
print(f"{src:<20} {original:<20} {translated:<20} {state:<12}")
def show_stats(self):
"""显示统计信息"""
print(f"\n{'='*60}")
print(f"📈 统计信息 - {self.name}")
print(f"{'='*60}")
print(f" 已转换数据包: {self.stats['packets_translated']}")
print(f" 已丢弃数据包: {self.stats['packets_dropped']}")
print(f" 活跃连接数: {self.stats['active_connections']}")
print(f" 规则匹配次数: {self.stats['rules_matched']}")关键要点:
translate_inbound:处理入站流量,将数据包的目的 IP 和目的端口替换为规则中定义的内网地址。translate_outbound:处理出站流量,这是针对入站连接的回程响应,修改源 IP 伪装成 NAT 网关。
3.3 模拟网络设备
做什么:定义一个简单的主机类,用于模拟发包和收包。
为什么这么做:方便我们在内存中演示端到端的通信过程。
# ============================================================
# 模拟网络设备
# ============================================================
class Host:
"""模拟网络主机"""
def __init__(self, name: str, ip: str):
self.name = name
self.ip = ip
self.port_counter = 10000
print(f"✅ 主机 [{name}] 创建成功,IP: {ip}")
def create_packet(self, dst_ip: str, dst_port: int, payload: str = "") -> Packet:
"""创建一个数据包"""
self.port_counter += 1
return Packet(
src_ip=self.ip,
dst_ip=dst_ip,
src_port=self.port_counter,
dst_port=dst_port,
payload=payload
)
def receive_packet(self, packet: Packet):
"""接收一个数据包"""
print(f" 📥 [{self.name}] 收到数据包: {packet}")
if packet.payload:
print(f" 负载: {packet.payload}")3.4 交互式 CLI Shell
做什么:用 Python 自带的 cmd 模块构建一个类似网络设备路由器的交互式命令行界面,替代硬编码的演示逻辑。
为什么这么做:让读者可以在终端里自由、动态地创建网络资源并验证连通性,带来真实的沉浸感操作体验。
import cmd
import shlex
class DNATShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 DNAT 交互式模拟器\n' \
'【可用指令】:\n' \
' 1. create_gateway <name> <public_ip> <private_ip> - 创建 NAT 网关\n' \
' 2. create_host <name> <ip> - 创建主机\n' \
' 3. add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port> - 添加 DNAT 规则\n' \
' 4. dnat <host_name> <dst_ip> <dst_port> - 测试 DNAT(公网→内网)\n' \
' 5. show_rules - 查看 DNAT 规则\n' \
' 6. show_connections - 查看连接表\n' \
' 7. reset - 重置所有资源\n' \
' 8. exit - 退出\n' \
'==========================================================='
prompt = '(DNAT-Sim) '
def emptyline(self):
pass
def do_create_gateway(self, arg):
"""创建 NAT 网关"""
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: create_gateway <name> <public_ip> <private_ip>")
try:
ALL_GATEWAYS[args[0]] = DNATGateway(args[0], public_ip=args[1], private_ip=args[2])
except Exception as e: print(f"❌ 创建失败: {e}")
def do_create_host(self, arg):
"""创建主机"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: create_host <name> <ip>")
try:
ALL_HOSTS[args[0]] = Host(args[0], args[1])
except Exception as e: print(f"❌ 创建失败: {e}")
def do_add_dnat(self, arg):
"""添加 DNAT 规则"""
args = shlex.split(arg)
if len(args) != 5: return print("❌ 用法: add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port>")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
rule = DNATRule(args[0], args[1], int(args[2]), args[3], int(args[4]))
gateway.add_rule(rule)
print(f"✅ 成功添加 DNAT 规则: {args[1]}:{args[2]} → {args[3]}:{args[4]}")
def do_dnat(self, arg):
"""测试 DNAT"""
args = shlex.split(arg)
if len(args) < 3: return print("❌ 用法: dnat <host_name> <dst_ip> <dst_port>")
host_name, dst_ip, dst_port = args[0], args[1], int(args[2])
host = ALL_HOSTS.get(host_name)
if not host: return print(f"❌ 主机 {host_name} 不存在")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
request = host.create_packet(dst_ip, dst_port, f"Request from {host_name}")
translated_request = gateway.translate_inbound(request)
if translated_request:
print(f"\n✅ [入站] DNAT 转换成功!")
print(f" 原始请求: {host.ip}:{request.src_port} → {dst_ip}:{dst_port}")
print(f" 转换请求: {translated_request.src_ip}:{translated_request.src_port} → {translated_request.dst_ip}:{translated_request.dst_port}")
# 模拟内网服务器返回响应
response = Packet(
src_ip=translated_request.dst_ip, dst_ip=translated_request.src_ip,
src_port=translated_request.dst_port, dst_port=translated_request.src_port,
payload=f"Response from internal server"
)
print(f"\n🖥️ 内网服务器 {translated_request.dst_ip}:{translated_request.dst_port} 处理请求并返回响应...")
# NAT 网关处理出站响应
translated_response = gateway.translate_outbound(response)
if translated_response:
print(f"✅ [出站] 反向 DNAT 转换成功!")
print(f" 原始响应: {response.src_ip}:{response.src_port} → {response.dst_ip}:{response.dst_port}")
print(f" 转换响应: {translated_response.src_ip}:{translated_response.src_port} → {translated_response.dst_ip}:{translated_response.dst_port}")
print(f"🎉 公网用户 {host_name} 成功收到内网服务响应!")
else:
print(f"❌ [出站] 反向 DNAT 转换失败(无法找到连接记录)")
else:
print(f"\n❌ [入站] DNAT 转换失败(可能没有匹配的规则)")
def do_show_rules(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_rules()
def do_show_connections(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_connections()
def do_reset(self, arg):
ALL_GATEWAYS.clear(); ALL_HOSTS.clear()
print("✅ 所有资源已重置")
def do_exit(self, arg):
print("👋 再见!")
return True
if __name__ == "__main__":
DNATShell().cmdloop()一键体验配置:
将以下指令粘贴进模拟器,可以直接体验 DNAT 端口映射核心功能:
create_gateway nat-gw 123.56.78.90 10.0.0.1
create_host web-server 192.168.1.10
create_host public-user 203.0.113.100
add_dnat web 123.56.78.90 80 192.168.1.10 80
dnat public-user 123.56.78.90 80
show_rules
show_connections四、完整可运行代码
将上面所有代码片段合并,保存为 nat_part2_dnat.py 即可运行。完整代码如下:
#!/usr/bin/env python3
"""
DNAT 模拟器 - 深入理解目的地址转换的工作原理
用纯 Python 实现一个简化的 DNAT 网关
"""
import time
from dataclasses import dataclass, field
from typing import Dict, Optional, List, Tuple
from enum import Enum
# ============================================================
# 数据结构定义
# ============================================================
class ConnectionState(Enum):
"""连接状态"""
NEW = "NEW"
ESTABLISHED = "ESTABLISHED"
RELATED = "RELATED"
CLOSED = "CLOSED"
@dataclass
class Packet:
"""数据包"""
src_ip: str
dst_ip: str
src_port: int
dst_port: int
protocol: str = "tcp"
payload: str = ""
def __str__(self):
return f"[{self.protocol}] {self.src_ip}:{self.src_port} → {self.dst_ip}:{self.dst_port}"
@dataclass
class DNATRule:
"""DNAT 规则"""
name: str # 规则名称
public_ip: str # 公网 IP
public_port: int # 公网端口
private_ip: str # 内网 IP
private_port: int # 内网端口
protocol: str = "tcp" # 协议
enabled: bool = True # 是否启用
def matches(self, packet: Packet) -> bool:
"""检查数据包是否匹配此规则"""
return (
packet.dst_ip == self.public_ip and
packet.dst_port == self.public_port and
packet.protocol == self.protocol and
self.enabled
)
@dataclass
class DNATEntry:
"""DNAT 转换表条目"""
original_dst_ip: str # 原始目的 IP
original_dst_port: int # 原始目的端口
translated_dst_ip: str # 转换后的目的 IP
translated_dst_port: int # 转换后的目的端口
src_ip: str # 源 IP
src_port: int # 源端口
state: ConnectionState = ConnectionState.NEW
created_at: float = field(default_factory=time.time)
last_seen: float = field(default_factory=time.time)
@property
def original_key(self) -> str:
return f"{self.src_ip}:{self.src_port}-{self.original_dst_ip}:{self.original_dst_port}"
@property
def translated_key(self) -> str:
return f"{self.src_ip}:{self.src_port}-{self.translated_dst_ip}:{self.translated_dst_port}"
# ============================================================
# DNAT 网关实现
# ============================================================
class DNATGateway:
"""
DNAT 网关实现
功能:将公网 IP:端口 映射到内网 IP:端口
"""
def __init__(self, name: str, public_ip: str, private_ip: str):
"""
初始化 DNAT 网关
Args:
name: 网关名称
public_ip: 公网 IP
private_ip: 内网 IP(连接内网的接口)
"""
self.name = name
self.public_ip = public_ip
self.private_ip = private_ip
# DNAT 规则列表
self.dnat_rules: List[DNATRule] = []
# DNAT 转换表
self.dnat_table: Dict[str, DNATEntry] = {}
# 反向映射
self.reverse_table: Dict[str, str] = {}
# 统计信息
self.stats = {
'packets_translated': 0,
'packets_dropped': 0,
'active_connections': 0,
'rules_matched': 0
}
print(f"✅ DNAT 网关 [{name}] 创建成功")
print(f" 公网 IP: {public_ip}")
print(f" 内网 IP: {private_ip}")
def add_rule(self, rule: DNATRule):
"""添加 DNAT 规则"""
self.dnat_rules.append(rule)
print(f"✅ 添加 DNAT 规则: {rule.name}")
print(f" {rule.public_ip}:{rule.public_port} → {rule.private_ip}:{rule.private_port}")
def remove_rule(self, rule_name: str):
"""移除 DNAT 规则"""
self.dnat_rules = [r for r in self.dnat_rules if r.name != rule_name]
print(f"✅ 移除 DNAT 规则: {rule_name}")
def find_matching_rule(self, packet: Packet) -> Optional[DNATRule]:
"""查找匹配的 DNAT 规则"""
for rule in self.dnat_rules:
if rule.matches(packet):
return rule
return None
def translate_inbound(self, packet: Packet) -> Optional[Packet]:
"""
处理入站数据包(公网 → 内网)
Args:
packet: 来自公网的数据包
Returns:
转换后的数据包,如果没有匹配的规则则返回 None
"""
print(f"\n📥 处理入站数据包: {packet}")
# 查找匹配的 DNAT 规则
rule = self.find_matching_rule(packet)
if not rule:
print(f" ❌ 没有匹配的 DNAT 规则,丢弃数据包")
self.stats['packets_dropped'] += 1
return None
self.stats['rules_matched'] += 1
print(f" ✅ 匹配规则: {rule.name}")
# 检查是否已有连接条目
entry_key = f"{packet.src_ip}:{packet.src_port}-{packet.dst_ip}:{packet.dst_port}"
if entry_key in self.dnat_table:
# 已有条目,更新状态
entry = self.dnat_table[entry_key]
entry.last_seen = time.time()
entry.state = ConnectionState.ESTABLISHED
print(f" ✅ 命中已有连接条目")
else:
# 新建条目
entry = DNATEntry(
original_dst_ip=packet.dst_ip,
original_dst_port=packet.dst_port,
translated_dst_ip=rule.private_ip,
translated_dst_port=rule.private_port,
src_ip=packet.src_ip,
src_port=packet.src_port,
state=ConnectionState.NEW
)
self.dnat_table[entry_key] = entry
self.reverse_table[entry.translated_key] = entry_key
self.stats['active_connections'] += 1
print(f" ✅ 新建连接条目:")
print(f" {packet.dst_ip}:{packet.dst_port} → {rule.private_ip}:{rule.private_port}")
# 创建转换后的数据包
translated_packet = Packet(
src_ip=packet.src_ip,
dst_ip=rule.private_ip,
src_port=packet.src_port,
dst_port=rule.private_port,
protocol=packet.protocol,
payload=packet.payload
)
self.stats['packets_translated'] += 1
print(f" 📦 转换后: {translated_packet}")
return translated_packet
def translate_outbound(self, packet: Packet) -> Optional[Packet]:
"""
处理出站数据包(内网 → 公网)
Args:
packet: 来自内网的数据包
Returns:
转换后的数据包,如果没有匹配的连接则返回 None
"""
print(f"\n📤 处理出站数据包: {packet}")
# 查找反向映射
reverse_key = f"{packet.dst_ip}:{packet.dst_port}-{packet.src_ip}:{packet.src_port}"
if reverse_key not in self.reverse_table:
# 没有匹配的连接,可能是内网主动发起的连接
# 这种情况应该由 SNAT 处理,不是 DNAT 的职责
print(f" ⚠️ 没有匹配的 DNAT 连接,跳过")
return packet
# 获取原始连接信息
original_key = self.reverse_table[reverse_key]
entry = self.dnat_table[original_key]
# 更新状态
entry.last_seen = time.time()
entry.state = ConnectionState.ESTABLISHED
# 创建转换后的数据包
translated_packet = Packet(
src_ip=self.public_ip,
dst_ip=packet.dst_ip,
src_port=packet.src_port,
dst_port=packet.dst_port,
protocol=packet.protocol,
payload=packet.payload
)
print(f" ✅ 反向 DNAT 转换:")
print(f" 源IP: {packet.src_ip} → {self.public_ip}")
return translated_packet
def show_rules(self):
"""显示所有 DNAT 规则"""
print(f"\n{'='*60}")
print(f"📋 DNAT 规则列表 - {self.name}")
print(f"{'='*60}")
if not self.dnat_rules:
print(" (空)")
return
print(f"{'规则名称':<15} {'公网地址':<20} {'内网地址':<20} {'状态':<8}")
print(f"{'-'*15} {'-'*20} {'-'*20} {'-'*8}")
for rule in self.dnat_rules:
public = f"{rule.public_ip}:{rule.public_port}"
private = f"{rule.private_ip}:{rule.private_port}"
status = "启用" if rule.enabled else "禁用"
print(f"{rule.name:<15} {public:<20} {private:<20} {status:<8}")
def show_connections(self):
"""显示连接表"""
print(f"\n{'='*60}")
print(f"📊 DNAT 连接表 - {self.name}")
print(f"{'='*60}")
if not self.dnat_table:
print(" (空)")
return
print(f"{'源地址':<20} {'原始目的地址':<20} {'转换后目的地址':<20} {'状态':<12}")
print(f"{'-'*20} {'-'*20} {'-'*20} {'-'*12}")
for entry in self.dnat_table.values():
src = f"{entry.src_ip}:{entry.src_port}"
original = f"{entry.original_dst_ip}:{entry.original_dst_port}"
translated = f"{entry.translated_dst_ip}:{entry.translated_dst_port}"
state = entry.state.value
print(f"{src:<20} {original:<20} {translated:<20} {state:<12}")
def show_stats(self):
"""显示统计信息"""
print(f"\n{'='*60}")
print(f"📈 统计信息 - {self.name}")
print(f"{'='*60}")
print(f" 已转换数据包: {self.stats['packets_translated']}")
print(f" 已丢弃数据包: {self.stats['packets_dropped']}")
print(f" 活跃连接数: {self.stats['active_connections']}")
print(f" 规则匹配次数: {self.stats['rules_matched']}")
# ============================================================
# 模拟网络设备
# ============================================================
class Host:
"""模拟网络主机"""
def __init__(self, name: str, ip: str):
self.name = name
self.ip = ip
self.port_counter = 10000
print(f"✅ 主机 [{name}] 创建成功,IP: {ip}")
def create_packet(self, dst_ip: str, dst_port: int, payload: str = "") -> Packet:
"""创建一个数据包"""
self.port_counter += 1
return Packet(
src_ip=self.ip,
dst_ip=dst_ip,
src_port=self.port_counter,
dst_port=dst_port,
payload=payload
)
def receive_packet(self, packet: Packet):
"""接收一个数据包"""
print(f" 📥 [{self.name}] 收到数据包: {packet}")
if packet.payload:
print(f" 负载: {packet.payload}")
# ============================================================
# 交互式 CLI Shell
# ============================================================
import cmd
import shlex
# 全局资源
ALL_GATEWAYS: Dict[str, DNATGateway] = {}
ALL_HOSTS: Dict[str, Host] = {}
class DNATShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 DNAT 交互式模拟器\n' \
'【可用指令】:\n' \
' 1. create_gateway <name> <public_ip> <private_ip> - 创建 NAT 网关\n' \
' 2. create_host <name> <ip> - 创建主机\n' \
' 3. add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port> - 添加 DNAT 规则\n' \
' 4. dnat <host_name> <dst_ip> <dst_port> - 测试 DNAT(公网→内网)\n' \
' 5. show_rules - 查看 DNAT 规则\n' \
' 6. show_connections - 查看连接表\n' \
' 7. reset - 重置所有资源\n' \
' 8. exit - 退出\n' \
'==========================================================='
prompt = '(DNAT-Sim) '
def emptyline(self):
pass
def do_create_gateway(self, arg):
"""创建 NAT 网关"""
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: create_gateway <name> <public_ip> <private_ip>")
try:
ALL_GATEWAYS[args[0]] = DNATGateway(args[0], public_ip=args[1], private_ip=args[2])
except Exception as e: print(f"❌ 创建失败: {e}")
def do_create_host(self, arg):
"""创建主机"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: create_host <name> <ip>")
try:
ALL_HOSTS[args[0]] = Host(args[0], args[1])
except Exception as e: print(f"❌ 创建失败: {e}")
def do_add_dnat(self, arg):
"""添加 DNAT 规则"""
args = shlex.split(arg)
if len(args) != 5: return print("❌ 用法: add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port>")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
rule = DNATRule(args[0], args[1], int(args[2]), args[3], int(args[4]))
gateway.add_rule(rule)
print(f"✅ 成功添加 DNAT 规则: {args[1]}:{args[2]} → {args[3]}:{args[4]}")
def do_dnat(self, arg):
"""测试 DNAT"""
args = shlex.split(arg)
if len(args) < 3: return print("❌ 用法: dnat <host_name> <dst_ip> <dst_port>")
host_name, dst_ip, dst_port = args[0], args[1], int(args[2])
host = ALL_HOSTS.get(host_name)
if not host: return print(f"❌ 主机 {host_name} 不存在")
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
gateway = list(ALL_GATEWAYS.values())[0]
request = host.create_packet(dst_ip, dst_port, f"Request from {host_name}")
translated_request = gateway.translate_inbound(request)
if translated_request:
print(f"\n✅ [入站] DNAT 转换成功!")
print(f" 原始请求: {host.ip}:{request.src_port} → {dst_ip}:{dst_port}")
print(f" 转换请求: {translated_request.src_ip}:{translated_request.src_port} → {translated_request.dst_ip}:{translated_request.dst_port}")
# 模拟内网服务器返回响应
response = Packet(
src_ip=translated_request.dst_ip, dst_ip=translated_request.src_ip,
src_port=translated_request.dst_port, dst_port=translated_request.src_port,
payload=f"Response from internal server"
)
print(f"\n🖥️ 内网服务器 {translated_request.dst_ip}:{translated_request.dst_port} 处理请求并返回响应...")
# NAT 网关处理出站响应
translated_response = gateway.translate_outbound(response)
if translated_response:
print(f"✅ [出站] 反向 DNAT 转换成功!")
print(f" 原始响应: {response.src_ip}:{response.src_port} → {response.dst_ip}:{response.dst_port}")
print(f" 转换响应: {translated_response.src_ip}:{translated_response.src_port} → {translated_response.dst_ip}:{translated_response.dst_port}")
print(f"🎉 公网用户 {host_name} 成功收到内网服务响应!")
else:
print(f"❌ [出站] 反向 DNAT 转换失败(无法找到连接记录)")
else:
print(f"\n❌ [入站] DNAT 转换失败(可能没有匹配的规则)")
def do_show_rules(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_rules()
def do_show_connections(self, arg):
if not ALL_GATEWAYS: return print("❌ 没有可用的 NAT 网关")
for gw in ALL_GATEWAYS.values(): gw.show_connections()
def do_reset(self, arg):
ALL_GATEWAYS.clear(); ALL_HOSTS.clear()
print("✅ 所有资源已重置")
def do_exit(self, arg):
print("👋 再见!")
return True
if __name__ == "__main__":
DNATShell().cmdloop()五、运行方法
- 将代码保存为
nat_part2_dnat.py - 执行:
python3 nat_part2_dnat.py- 运行结果
root@ubuntu:~# python3 nat_part2_dnat.py
===========================================================
欢迎使用 DNAT 交互式模拟器
【可用指令】:
1. create_gateway <name> <public_ip> <private_ip> - 创建 NAT 网关
2. create_host <name> <ip> - 创建主机
3. add_dnat <name> <pub_ip> <pub_port> <priv_ip> <priv_port> - 添加 DNAT 规则
4. dnat <host_name> <dst_ip> <dst_port> - 测试 DNAT(公网→内网)
5. show_rules - 查看 DNAT 规则
6. show_connections - 查看连接表
7. reset - 重置所有资源
8. exit - 退出
===========================================================
(DNAT-Sim) create_gateway nat-gw 123.56.78.90 10.0.0.1
✅ DNAT 网关 [nat-gw] 创建成功
公网 IP: 123.56.78.90
内网 IP: 10.0.0.1
(DNAT-Sim) create_host web-server 192.168.1.10
✅ 主机 [web-server] 创建成功,IP: 192.168.1.10
(DNAT-Sim) create_host public-user 203.0.113.100
✅ 主机 [public-user] 创建成功,IP: 203.0.113.100
(DNAT-Sim) add_dnat web 123.56.78.90 80 192.168.1.10 80
✅ 添加 DNAT 规则: web
123.56.78.90:80 → 192.168.1.10:80
✅ 成功添加 DNAT 规则: 123.56.78.90:80 → 192.168.1.10:80
(DNAT-Sim) dnat public-user 123.56.78.90 80
📥 处理入站数据包: [tcp] 203.0.113.100:10001 → 123.56.78.90:80
✅ 匹配规则: web
✅ 新建连接条目:
123.56.78.90:80 → 192.168.1.10:80
📦 转换后: [tcp] 203.0.113.100:10001 → 192.168.1.10:80
✅ [入站] DNAT 转换成功!
原始请求: 203.0.113.100:10001 → 123.56.78.90:80
转换请求: 203.0.113.100:10001 → 192.168.1.10:80
🖥️ 内网服务器 192.168.1.10:80 处理请求并返回响应...
📤 处理出站数据包: [tcp] 192.168.1.10:80 → 203.0.113.100:10001
✅ 反向 DNAT 转换:
源IP: 192.168.1.10 → 123.56.78.90
✅ [出站] 反向 DNAT 转换成功!
原始响应: 192.168.1.10:80 → 203.0.113.100:10001
转换响应: 123.56.78.90:80 → 203.0.113.100:10001
🎉 公网用户 public-user 成功收到内网服务响应!
(DNAT-Sim) show_rules
============================================================
📋 DNAT 规则列表 - nat-gw
============================================================
规则名称 公网地址 内网地址 状态
--------------- -------------------- -------------------- --------
web 123.56.78.90:80 192.168.1.10:80 启用
(DNAT-Sim) show_connections
============================================================
📊 DNAT 连接表 - nat-gw
============================================================
源地址 原始目的地址 转换后目的地址 状态
-------------------- -------------------- -------------------- ------------
203.0.113.100:10001 123.56.78.90:80 192.168.1.10:80 ESTABLISHED一键体验配置:
将以下指令粘贴进模拟器,可以直接体验 DNAT 核心功能:
create_gateway nat-gw 123.56.78.90 192.168.1.1
create_host web 192.168.1.10
create_host db 192.168.1.11
create_host public-user 203.0.113.100
add_dnat web 123.56.78.90 80 192.168.1.10 80
add_dnat db 123.56.78.90 3306 192.168.1.11 3306
dnat public-user 123.56.78.90 80
dnat public-user 123.56.78.90 3306
show_rules
show_connections六、交互式演示
下面是一个交互式演示,帮助你直观理解 DNAT 端口映射的工作原理:
🎮 动手试试:DNAT 端口映射演示
体验 DNAT(目的地址转换)的工作原理:公网用户如何通过 NAT 网关访问内网服务
演示说明:
- 点击 「DNAT: 公网 → Web」 按钮,观察公网用户如何访问内网 Web 服务
- 点击 「DNAT: 公网 → DB」 按钮,观察公网用户如何访问内网数据库
- 注意观察数据包的目的 IP 和端口是如何被转换的
常见问题
Q1: 为什么DNAT需要连接跟踪
A: DNAT 需要连接跟踪来在反向流量(内网 → 公网)到达时,能够识别出对应的连接并将其目的地址还原回公网 IP。
如果没有连接跟踪,当内网服务器返回响应时,NAT 网关无法知道这个响应是属于哪个连接的,也就无法将其目的地址从 192.168.1.10 转换回公网 IP 123.56.78.90。
Q2: DNAT 支持同时映射多个服务吗
A: 是的,DNAT 支持同时映射多个服务。只要每个服务的端口不同,就可以通过不同的端口号将不同的服务映射到不同的内网服务器上。
Q3: DNAT 会影响内网主机的通信吗
A: DNAT 主要用于入站流量(公网 → 内网),不会影响内网主机之间的通信。但是,如果内网主机需要访问公网服务,则需要配置 SNAT 规则。
八、下一篇预告
在下一篇中,我们将学习:
- NAT 网关的完整实现
- 连接跟踪(Conntrack)的深入原理
- NAT 穿透技术
- 生产环境中的 NAT 最佳实践
