Skip to content

Mux 多路复用

Mux(多路复用)允许多个逻辑流共享单个传输连接。这减少了连接建立延迟和开销,特别是在高延迟传输上效果显著。

源码common/mux/

架构

mermaid
flowchart LR
    subgraph Client["客户端"]
        S1[流 1]
        S2[流 2]
        S3[流 3]
        CM[ClientManager]
        CW[ClientWorker]
    end

    subgraph Transport["单个传输连接"]
        Frames["Mux 帧<br/>(交错传输)"]
    end

    subgraph Server["服务端"]
        SW[ServerWorker]
        D1[分发 1]
        D2[分发 2]
        D3[分发 3]
    end

    S1 --> CM
    S2 --> CM
    S3 --> CM
    CM --> CW
    CW --> Frames
    Frames --> SW
    SW --> D1
    SW --> D2
    SW --> D3

帧格式

+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
| Meta Len  | Session ID | Status | Option |
| (2B BE)   | (2B BE)    | (1B)   | (1B)   |
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+

For SessionStatusNew:
+--+--+--+--+--+--+--+--+
| Network | Port  | Addr |
| (1B)    | (2B)  | (var)|
+--+--+--+--+--+--+--+--+

Optional (XUDP): [8B GlobalID]

Data payload:
+--+--+--+--+
| Data Len  |  (2B BE, followed by payload bytes)
+--+--+--+--+

元数据长度

前 2 字节编码后续元数据的长度(session ID + status + option + 目标地址)。

Session ID

2 字节会话标识符。每个逻辑流在连接中获得唯一的 ID。ID=0 配合 network=UDP 用于检测 XUDP。

会话状态

名称描述
0x01New新会话(包含目标地址)
0x02Keep继续现有会话(后续为数据)
0x03End会话关闭
0x04KeepAlive连接保活(无数据)

选项标志

名称描述
0x01OptionData帧包含数据载荷
0x02OptionError会话以错误结束

目标网络

网络
0x01TCP
0x02UDP

地址格式

Port:     2 bytes big-endian
AddrType: 0x01=IPv4, 0x02=Domain, 0x03=IPv6
Address:  4B (IPv4), 1B+NB (domain len+domain), 16B (IPv6)

客户端

ClientManager

出站侧 Mux 的入口:

go
type ClientManager struct {
    Enabled bool
    Picker  WorkerPicker
}

func (m *ClientManager) Dispatch(ctx, link) error {
    for i := 0; i < 16; i++ {
        worker, _ := m.Picker.PickAvailable()
        if worker.Dispatch(ctx, link) {
            return nil
        }
    }
    return errors.New("unable to find an available mux client")
}

ClientWorker

每个 ClientWorker 管理一个传输连接上的多个会话:

go
type ClientWorker struct {
    sessionManager *SessionManager  // tracks active sessions
    link           transport.Link   // transport connection
    done           *done.Instance
}

当达到最大并发会话数(可配置,默认约 128)时,worker 被视为"已满"。

IncrementalWorkerPicker

按需创建新的 worker:

go
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
    p.access.Lock()
    defer p.access.Unlock()

    idx := p.findAvailable()
    if idx >= 0 {
        return p.workers[idx], nil
    }

    // All workers full: create a new one
    worker, err := p.Factory.Create()
    p.workers = append(p.workers, worker)
    return worker, nil
}

ClientWorker.Dispatch

go
func (m *ClientWorker) Dispatch(ctx, link) bool {
    if m.IsFull() || m.Closed() {
        return false
    }

    // Allocate session
    s := m.sessionManager.Allocate()

    // Build frame metadata
    meta := FrameMetadata{
        SessionID:     s.ID,
        SessionStatus: SessionStatusNew,
        Target:        outbound.Target,
        Option:        OptionData,
    }

    // Write frame to transport
    // Copy data from link.Reader to transport (with framing)
    // Copy data from transport to link.Writer (with deframing)
}

服务端

ServerWorker

解复用传入帧并分发每个会话:

go
type ServerWorker struct {
    dispatcher     routing.Dispatcher
    link           *transport.Link
    sessionManager *SessionManager
}

func NewServerWorker(ctx, dispatcher, link) (*ServerWorker, error) {
    // Start worker goroutine
    go worker.run(ctx)
    return worker, nil
}

帧处理循环

go
func (w *ServerWorker) run(ctx) {
    for {
        // Read frame metadata
        meta := new(FrameMetadata)
        meta.Unmarshal(reader)

        switch meta.SessionStatus {
        case SessionStatusNew:
            // Create new session
            // Dispatch via routing
            s := w.sessionManager.Add(meta.SessionID)
            link := dispatcher.Dispatch(ctx, meta.Target)
            // Start copying between session and link

        case SessionStatusKeep:
            // Write data to existing session
            s := w.sessionManager.Get(meta.SessionID)
            // Read data payload and write to session

        case SessionStatusEnd:
            // Close session
            w.sessionManager.Remove(meta.SessionID)

        case SessionStatusKeepAlive:
            // No-op, just prevents connection timeout
        }
    }
}

会话管理器

go
type SessionManager struct {
    sessions map[uint16]*Session
    count    int
    closed   bool
}

type Session struct {
    input        buf.Reader    // data from transport
    output       buf.Writer    // data to transport
    parent       *SessionManager
    ID           uint16
    transferType protocol.TransferType
}

数据帧封装

为现有会话写入数据时:

go
// Writer wraps each write in a mux frame:
frame = [2B meta_len][2B session_id][status=Keep][option=Data]
data  = [2B data_len][payload]

读取时,读取器解析帧并将数据路由到正确的会话。

Mux 配置

json
{
  "outbounds": [{
    "mux": {
      "enabled": true,
      "concurrency": 8,    // max streams per connection
      "xudpConcurrency": 16,
      "xudpProxyUDP443": "reject"
    }
  }]
}

实现要点

  1. Session ID 空间:16 位,因此每个连接最多 65535 个会话。实际上并发量限制在约 128 以保证性能。

  2. 帧封装开销:每个数据帧增加 6-8 字节的元数据。对于小型 UDP 数据包,这一开销较为显著。

  3. 队头阻塞:所有会话共享一个 TCP 连接。如果一个会话卡住(如重传),所有会话都会受到影响。这是 Mux 的根本性权衡。

  4. KeepAliveSessionStatusKeepAlive 帧防止传输连接因空闲超时而关闭。在无数据流动时定期发送。

  5. Worker 生命周期:Worker 按需创建,每 30 秒定期清理。当 worker 的所有会话结束且没有新会话到达时,worker 会被关闭。

  6. XUDP 检测:Session ID 0 + network UDP 表示 XUDP 模式。详见 XUDP 协议

用于重新实现目的的技术分析。