Skip to content

Mux Multiplexing

Mux (multiplexing) allows multiple logical streams to share a single transport connection. This reduces connection setup latency and overhead, especially over high-latency transports.

Source: common/mux/

Architecture

mermaid
flowchart LR
    subgraph Client
        S1[Stream 1]
        S2[Stream 2]
        S3[Stream 3]
        CM[ClientManager]
        CW[ClientWorker]
    end

    subgraph Transport["Single Transport Connection"]
        Frames["Mux Frames<br/>(interleaved)"]
    end

    subgraph Server
        SW[ServerWorker]
        D1[Dispatch 1]
        D2[Dispatch 2]
        D3[Dispatch 3]
    end

    S1 --> CM
    S2 --> CM
    S3 --> CM
    CM --> CW
    CW --> Frames
    Frames --> SW
    SW --> D1
    SW --> D2
    SW --> D3

Frame Format

+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
| Meta Len  | Session ID | Status | Option |
| (2B BE)   | (2B BE)    | (1B)   | (1B)   |
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+

For SessionStatusNew:
+--+--+--+--+--+--+--+--+
| Network | Port  | Addr |
| (1B)    | (2B)  | (var)|
+--+--+--+--+--+--+--+--+

Optional (XUDP): [8B GlobalID]

Data payload:
+--+--+--+--+
| Data Len  |  (2B BE, followed by payload bytes)
+--+--+--+--+

Meta Length

The first 2 bytes encode the length of the metadata that follows (session ID + status + option + target).

Session ID

2-byte session identifier. Each logical stream gets a unique ID within the connection. ID=0 with network=UDP is used to detect XUDP.

Session Status

ValueNameDescription
0x01NewNew session (includes target address)
0x02KeepContinue existing session (data follows)
0x03EndSession closed
0x04KeepAliveConnection keep-alive (no data)

Option Flags

BitNameDescription
0x01OptionDataFrame contains data payload
0x02OptionErrorSession ended with error

Target Network

ValueNetwork
0x01TCP
0x02UDP

Address Format

Port:     2 bytes big-endian
AddrType: 0x01=IPv4, 0x02=Domain, 0x03=IPv6
Address:  4B (IPv4), 1B+NB (domain len+domain), 16B (IPv6)

Client Side

ClientManager

The entry point for mux on the outbound side:

go
type ClientManager struct {
    Enabled bool
    Picker  WorkerPicker
}

func (m *ClientManager) Dispatch(ctx, link) error {
    for i := 0; i < 16; i++ {
        worker, _ := m.Picker.PickAvailable()
        if worker.Dispatch(ctx, link) {
            return nil
        }
    }
    return errors.New("unable to find an available mux client")
}

ClientWorker

Each ClientWorker manages one transport connection with multiple sessions:

go
type ClientWorker struct {
    sessionManager *SessionManager  // tracks active sessions
    link           transport.Link   // transport connection
    done           *done.Instance
}

A worker is "full" when it has reached the max concurrent sessions (configurable, default ~128).

IncrementalWorkerPicker

Creates new workers on demand:

go
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
    p.access.Lock()
    defer p.access.Unlock()

    idx := p.findAvailable()
    if idx >= 0 {
        return p.workers[idx], nil
    }

    // All workers full: create a new one
    worker, err := p.Factory.Create()
    p.workers = append(p.workers, worker)
    return worker, nil
}

ClientWorker.Dispatch

go
func (m *ClientWorker) Dispatch(ctx, link) bool {
    if m.IsFull() || m.Closed() {
        return false
    }

    // Allocate session
    s := m.sessionManager.Allocate()

    // Build frame metadata
    meta := FrameMetadata{
        SessionID:     s.ID,
        SessionStatus: SessionStatusNew,
        Target:        outbound.Target,
        Option:        OptionData,
    }

    // Write frame to transport
    // Copy data from link.Reader to transport (with framing)
    // Copy data from transport to link.Writer (with deframing)
}

Server Side

ServerWorker

Demultiplexes incoming frames and dispatches each session:

go
type ServerWorker struct {
    dispatcher     routing.Dispatcher
    link           *transport.Link
    sessionManager *SessionManager
}

func NewServerWorker(ctx, dispatcher, link) (*ServerWorker, error) {
    // Start worker goroutine
    go worker.run(ctx)
    return worker, nil
}

Frame Processing Loop

go
func (w *ServerWorker) run(ctx) {
    for {
        // Read frame metadata
        meta := new(FrameMetadata)
        meta.Unmarshal(reader)

        switch meta.SessionStatus {
        case SessionStatusNew:
            // Create new session
            // Dispatch via routing
            s := w.sessionManager.Add(meta.SessionID)
            link := dispatcher.Dispatch(ctx, meta.Target)
            // Start copying between session and link

        case SessionStatusKeep:
            // Write data to existing session
            s := w.sessionManager.Get(meta.SessionID)
            // Read data payload and write to session

        case SessionStatusEnd:
            // Close session
            w.sessionManager.Remove(meta.SessionID)

        case SessionStatusKeepAlive:
            // No-op, just prevents connection timeout
        }
    }
}

Session Manager

go
type SessionManager struct {
    sessions map[uint16]*Session
    count    int
    closed   bool
}

type Session struct {
    input        buf.Reader    // data from transport
    output       buf.Writer    // data to transport
    parent       *SessionManager
    ID           uint16
    transferType protocol.TransferType
}

Data Framing

When writing data for an existing session:

go
// Writer wraps each write in a mux frame:
frame = [2B meta_len][2B session_id][status=Keep][option=Data]
data  = [2B data_len][payload]

When reading, the reader parses frames and routes data to the correct session.

Mux Configuration

json
{
  "outbounds": [{
    "mux": {
      "enabled": true,
      "concurrency": 8,    // max streams per connection
      "xudpConcurrency": 16,
      "xudpProxyUDP443": "reject"
    }
  }]
}

Implementation Notes

  1. Session ID space: 16-bit, so max 65535 sessions per connection. In practice, concurrency is limited to ~128 for performance.

  2. Framing overhead: Each data frame adds 6-8 bytes of metadata. For small UDP packets, this overhead is significant.

  3. Head-of-line blocking: All sessions share one TCP connection. If one session stalls (e.g., retransmission), all sessions are affected. This is the fundamental tradeoff of mux.

  4. KeepAlive: The SessionStatusKeepAlive frame prevents the transport connection from being closed by idle timeout. Sent periodically when no data flows.

  5. Worker lifecycle: Workers are created on demand and cleaned up periodically (every 30s). A worker is closed when all its sessions end and no new sessions arrive.

  6. XUDP detection: Session ID 0 + network UDP indicates XUDP mode. See XUDP Protocol for details.

Technical analysis for re-implementation purposes.