Mux Multiplexing
Mux (multiplexing) allows multiple logical streams to share a single transport connection. This reduces connection setup latency and overhead, especially over high-latency transports.
Source: common/mux/
Architecture
flowchart LR
subgraph Client
S1[Stream 1]
S2[Stream 2]
S3[Stream 3]
CM[ClientManager]
CW[ClientWorker]
end
subgraph Transport["Single Transport Connection"]
Frames["Mux Frames<br/>(interleaved)"]
end
subgraph Server
SW[ServerWorker]
D1[Dispatch 1]
D2[Dispatch 2]
D3[Dispatch 3]
end
S1 --> CM
S2 --> CM
S3 --> CM
CM --> CW
CW --> Frames
Frames --> SW
SW --> D1
SW --> D2
SW --> D3Frame Format
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
| Meta Len | Session ID | Status | Option |
| (2B BE) | (2B BE) | (1B) | (1B) |
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
For SessionStatusNew:
+--+--+--+--+--+--+--+--+
| Network | Port | Addr |
| (1B) | (2B) | (var)|
+--+--+--+--+--+--+--+--+
Optional (XUDP): [8B GlobalID]
Data payload:
+--+--+--+--+
| Data Len | (2B BE, followed by payload bytes)
+--+--+--+--+Meta Length
The first 2 bytes encode the length of the metadata that follows (session ID + status + option + target).
Session ID
2-byte session identifier. Each logical stream gets a unique ID within the connection. ID=0 with network=UDP is used to detect XUDP.
Session Status
| Value | Name | Description |
|---|---|---|
0x01 | New | New session (includes target address) |
0x02 | Keep | Continue existing session (data follows) |
0x03 | End | Session closed |
0x04 | KeepAlive | Connection keep-alive (no data) |
Option Flags
| Bit | Name | Description |
|---|---|---|
0x01 | OptionData | Frame contains data payload |
0x02 | OptionError | Session ended with error |
Target Network
| Value | Network |
|---|---|
0x01 | TCP |
0x02 | UDP |
Address Format
Port: 2 bytes big-endian
AddrType: 0x01=IPv4, 0x02=Domain, 0x03=IPv6
Address: 4B (IPv4), 1B+NB (domain len+domain), 16B (IPv6)Client Side
ClientManager
The entry point for mux on the outbound side:
type ClientManager struct {
Enabled bool
Picker WorkerPicker
}
func (m *ClientManager) Dispatch(ctx, link) error {
for i := 0; i < 16; i++ {
worker, _ := m.Picker.PickAvailable()
if worker.Dispatch(ctx, link) {
return nil
}
}
return errors.New("unable to find an available mux client")
}ClientWorker
Each ClientWorker manages one transport connection with multiple sessions:
type ClientWorker struct {
sessionManager *SessionManager // tracks active sessions
link transport.Link // transport connection
done *done.Instance
}A worker is "full" when it has reached the max concurrent sessions (configurable, default ~128).
IncrementalWorkerPicker
Creates new workers on demand:
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
p.access.Lock()
defer p.access.Unlock()
idx := p.findAvailable()
if idx >= 0 {
return p.workers[idx], nil
}
// All workers full: create a new one
worker, err := p.Factory.Create()
p.workers = append(p.workers, worker)
return worker, nil
}ClientWorker.Dispatch
func (m *ClientWorker) Dispatch(ctx, link) bool {
if m.IsFull() || m.Closed() {
return false
}
// Allocate session
s := m.sessionManager.Allocate()
// Build frame metadata
meta := FrameMetadata{
SessionID: s.ID,
SessionStatus: SessionStatusNew,
Target: outbound.Target,
Option: OptionData,
}
// Write frame to transport
// Copy data from link.Reader to transport (with framing)
// Copy data from transport to link.Writer (with deframing)
}Server Side
ServerWorker
Demultiplexes incoming frames and dispatches each session:
type ServerWorker struct {
dispatcher routing.Dispatcher
link *transport.Link
sessionManager *SessionManager
}
func NewServerWorker(ctx, dispatcher, link) (*ServerWorker, error) {
// Start worker goroutine
go worker.run(ctx)
return worker, nil
}Frame Processing Loop
func (w *ServerWorker) run(ctx) {
for {
// Read frame metadata
meta := new(FrameMetadata)
meta.Unmarshal(reader)
switch meta.SessionStatus {
case SessionStatusNew:
// Create new session
// Dispatch via routing
s := w.sessionManager.Add(meta.SessionID)
link := dispatcher.Dispatch(ctx, meta.Target)
// Start copying between session and link
case SessionStatusKeep:
// Write data to existing session
s := w.sessionManager.Get(meta.SessionID)
// Read data payload and write to session
case SessionStatusEnd:
// Close session
w.sessionManager.Remove(meta.SessionID)
case SessionStatusKeepAlive:
// No-op, just prevents connection timeout
}
}
}Session Manager
type SessionManager struct {
sessions map[uint16]*Session
count int
closed bool
}
type Session struct {
input buf.Reader // data from transport
output buf.Writer // data to transport
parent *SessionManager
ID uint16
transferType protocol.TransferType
}Data Framing
When writing data for an existing session:
// Writer wraps each write in a mux frame:
frame = [2B meta_len][2B session_id][status=Keep][option=Data]
data = [2B data_len][payload]When reading, the reader parses frames and routes data to the correct session.
Mux Configuration
{
"outbounds": [{
"mux": {
"enabled": true,
"concurrency": 8, // max streams per connection
"xudpConcurrency": 16,
"xudpProxyUDP443": "reject"
}
}]
}Implementation Notes
Session ID space: 16-bit, so max 65535 sessions per connection. In practice, concurrency is limited to ~128 for performance.
Framing overhead: Each data frame adds 6-8 bytes of metadata. For small UDP packets, this overhead is significant.
Head-of-line blocking: All sessions share one TCP connection. If one session stalls (e.g., retransmission), all sessions are affected. This is the fundamental tradeoff of mux.
KeepAlive: The
SessionStatusKeepAliveframe prevents the transport connection from being closed by idle timeout. Sent periodically when no data flows.Worker lifecycle: Workers are created on demand and cleaned up periodically (every 30s). A worker is closed when all its sessions end and no new sessions arrive.
XUDP detection: Session ID 0 + network UDP indicates XUDP mode. See XUDP Protocol for details.