Mux 多路复用
Mux(多路复用)允许多个逻辑流共享单个传输连接。这减少了连接建立延迟和开销,特别是在高延迟传输上效果显著。
源码:common/mux/
架构
mermaid
flowchart LR
subgraph Client["客户端"]
S1[流 1]
S2[流 2]
S3[流 3]
CM[ClientManager]
CW[ClientWorker]
end
subgraph Transport["单个传输连接"]
Frames["Mux 帧<br/>(交错传输)"]
end
subgraph Server["服务端"]
SW[ServerWorker]
D1[分发 1]
D2[分发 2]
D3[分发 3]
end
S1 --> CM
S2 --> CM
S3 --> CM
CM --> CW
CW --> Frames
Frames --> SW
SW --> D1
SW --> D2
SW --> D3帧格式
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
| Meta Len | Session ID | Status | Option |
| (2B BE) | (2B BE) | (1B) | (1B) |
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
For SessionStatusNew:
+--+--+--+--+--+--+--+--+
| Network | Port | Addr |
| (1B) | (2B) | (var)|
+--+--+--+--+--+--+--+--+
Optional (XUDP): [8B GlobalID]
Data payload:
+--+--+--+--+
| Data Len | (2B BE, followed by payload bytes)
+--+--+--+--+元数据长度
前 2 字节编码后续元数据的长度(session ID + status + option + 目标地址)。
Session ID
2 字节会话标识符。每个逻辑流在连接中获得唯一的 ID。ID=0 配合 network=UDP 用于检测 XUDP。
会话状态
| 值 | 名称 | 描述 |
|---|---|---|
0x01 | New | 新会话(包含目标地址) |
0x02 | Keep | 继续现有会话(后续为数据) |
0x03 | End | 会话关闭 |
0x04 | KeepAlive | 连接保活(无数据) |
选项标志
| 位 | 名称 | 描述 |
|---|---|---|
0x01 | OptionData | 帧包含数据载荷 |
0x02 | OptionError | 会话以错误结束 |
目标网络
| 值 | 网络 |
|---|---|
0x01 | TCP |
0x02 | UDP |
地址格式
Port: 2 bytes big-endian
AddrType: 0x01=IPv4, 0x02=Domain, 0x03=IPv6
Address: 4B (IPv4), 1B+NB (domain len+domain), 16B (IPv6)客户端
ClientManager
出站侧 Mux 的入口:
go
type ClientManager struct {
Enabled bool
Picker WorkerPicker
}
func (m *ClientManager) Dispatch(ctx, link) error {
for i := 0; i < 16; i++ {
worker, _ := m.Picker.PickAvailable()
if worker.Dispatch(ctx, link) {
return nil
}
}
return errors.New("unable to find an available mux client")
}ClientWorker
每个 ClientWorker 管理一个传输连接上的多个会话:
go
type ClientWorker struct {
sessionManager *SessionManager // tracks active sessions
link transport.Link // transport connection
done *done.Instance
}当达到最大并发会话数(可配置,默认约 128)时,worker 被视为"已满"。
IncrementalWorkerPicker
按需创建新的 worker:
go
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
p.access.Lock()
defer p.access.Unlock()
idx := p.findAvailable()
if idx >= 0 {
return p.workers[idx], nil
}
// All workers full: create a new one
worker, err := p.Factory.Create()
p.workers = append(p.workers, worker)
return worker, nil
}ClientWorker.Dispatch
go
func (m *ClientWorker) Dispatch(ctx, link) bool {
if m.IsFull() || m.Closed() {
return false
}
// Allocate session
s := m.sessionManager.Allocate()
// Build frame metadata
meta := FrameMetadata{
SessionID: s.ID,
SessionStatus: SessionStatusNew,
Target: outbound.Target,
Option: OptionData,
}
// Write frame to transport
// Copy data from link.Reader to transport (with framing)
// Copy data from transport to link.Writer (with deframing)
}服务端
ServerWorker
解复用传入帧并分发每个会话:
go
type ServerWorker struct {
dispatcher routing.Dispatcher
link *transport.Link
sessionManager *SessionManager
}
func NewServerWorker(ctx, dispatcher, link) (*ServerWorker, error) {
// Start worker goroutine
go worker.run(ctx)
return worker, nil
}帧处理循环
go
func (w *ServerWorker) run(ctx) {
for {
// Read frame metadata
meta := new(FrameMetadata)
meta.Unmarshal(reader)
switch meta.SessionStatus {
case SessionStatusNew:
// Create new session
// Dispatch via routing
s := w.sessionManager.Add(meta.SessionID)
link := dispatcher.Dispatch(ctx, meta.Target)
// Start copying between session and link
case SessionStatusKeep:
// Write data to existing session
s := w.sessionManager.Get(meta.SessionID)
// Read data payload and write to session
case SessionStatusEnd:
// Close session
w.sessionManager.Remove(meta.SessionID)
case SessionStatusKeepAlive:
// No-op, just prevents connection timeout
}
}
}会话管理器
go
type SessionManager struct {
sessions map[uint16]*Session
count int
closed bool
}
type Session struct {
input buf.Reader // data from transport
output buf.Writer // data to transport
parent *SessionManager
ID uint16
transferType protocol.TransferType
}数据帧封装
为现有会话写入数据时:
go
// Writer wraps each write in a mux frame:
frame = [2B meta_len][2B session_id][status=Keep][option=Data]
data = [2B data_len][payload]读取时,读取器解析帧并将数据路由到正确的会话。
Mux 配置
json
{
"outbounds": [{
"mux": {
"enabled": true,
"concurrency": 8, // max streams per connection
"xudpConcurrency": 16,
"xudpProxyUDP443": "reject"
}
}]
}实现要点
Session ID 空间:16 位,因此每个连接最多 65535 个会话。实际上并发量限制在约 128 以保证性能。
帧封装开销:每个数据帧增加 6-8 字节的元数据。对于小型 UDP 数据包,这一开销较为显著。
队头阻塞:所有会话共享一个 TCP 连接。如果一个会话卡住(如重传),所有会话都会受到影响。这是 Mux 的根本性权衡。
KeepAlive:
SessionStatusKeepAlive帧防止传输连接因空闲超时而关闭。在无数据流动时定期发送。Worker 生命周期:Worker 按需创建,每 30 秒定期清理。当 worker 的所有会话结束且没有新会话到达时,worker 会被关闭。
XUDP 检测:Session ID 0 + network UDP 表示 XUDP 模式。详见 XUDP 协议。