Мультиплексирование Mux
Mux (мультиплексирование) позволяет нескольким логическим потокам совместно использовать одно транспортное соединение. Это сокращает задержку установки соединения и накладные расходы, особенно при транспортах с высокой задержкой.
Исходный код: common/mux/
Архитектура
flowchart LR
subgraph Клиент
S1[Поток 1]
S2[Поток 2]
S3[Поток 3]
CM[ClientManager]
CW[ClientWorker]
end
subgraph Transport["Единое транспортное соединение"]
Frames["Mux-кадры<br/>(чередующиеся)"]
end
subgraph Сервер
SW[ServerWorker]
D1[Диспетчеризация 1]
D2[Диспетчеризация 2]
D3[Диспетчеризация 3]
end
S1 --> CM
S2 --> CM
S3 --> CM
CM --> CW
CW --> Frames
Frames --> SW
SW --> D1
SW --> D2
SW --> D3Формат кадра
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
| Meta Len | Session ID | Status | Option |
| (2B BE) | (2B BE) | (1B) | (1B) |
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
For SessionStatusNew:
+--+--+--+--+--+--+--+--+
| Network | Port | Addr |
| (1B) | (2B) | (var)|
+--+--+--+--+--+--+--+--+
Optional (XUDP): [8B GlobalID]
Data payload:
+--+--+--+--+
| Data Len | (2B BE, followed by payload bytes)
+--+--+--+--+Длина метаданных
Первые 2 байта кодируют длину следующих за ними метаданных (ID сессии + статус + опции + адрес назначения).
ID сессии
2-байтовый идентификатор сессии. Каждый логический поток получает уникальный ID в рамках соединения. ID=0 с сетью UDP используется для обнаружения XUDP.
Статус сессии
| Значение | Название | Описание |
|---|---|---|
0x01 | New | Новая сессия (включает адрес назначения) |
0x02 | Keep | Продолжение существующей сессии (далее следуют данные) |
0x03 | End | Сессия закрыта |
0x04 | KeepAlive | Поддержание соединения (без данных) |
Флаги опций
| Бит | Название | Описание |
|---|---|---|
0x01 | OptionData | Кадр содержит полезную нагрузку |
0x02 | OptionError | Сессия завершилась с ошибкой |
Целевая сеть
| Значение | Сеть |
|---|---|
0x01 | TCP |
0x02 | UDP |
Формат адреса
Port: 2 bytes big-endian
AddrType: 0x01=IPv4, 0x02=Domain, 0x03=IPv6
Address: 4B (IPv4), 1B+NB (domain len+domain), 16B (IPv6)Клиентская сторона
ClientManager
Точка входа для mux на стороне исходящего соединения:
type ClientManager struct {
Enabled bool
Picker WorkerPicker
}
func (m *ClientManager) Dispatch(ctx, link) error {
for i := 0; i < 16; i++ {
worker, _ := m.Picker.PickAvailable()
if worker.Dispatch(ctx, link) {
return nil
}
}
return errors.New("unable to find an available mux client")
}ClientWorker
Каждый ClientWorker управляет одним транспортным соединением с несколькими сессиями:
type ClientWorker struct {
sessionManager *SessionManager // tracks active sessions
link transport.Link // transport connection
done *done.Instance
}Воркер считается «заполненным», когда достигнуто максимальное количество параллельных сессий (настраиваемое, по умолчанию ~128).
IncrementalWorkerPicker
Создаёт новых воркеров по запросу:
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
p.access.Lock()
defer p.access.Unlock()
idx := p.findAvailable()
if idx >= 0 {
return p.workers[idx], nil
}
// All workers full: create a new one
worker, err := p.Factory.Create()
p.workers = append(p.workers, worker)
return worker, nil
}ClientWorker.Dispatch
func (m *ClientWorker) Dispatch(ctx, link) bool {
if m.IsFull() || m.Closed() {
return false
}
// Allocate session
s := m.sessionManager.Allocate()
// Build frame metadata
meta := FrameMetadata{
SessionID: s.ID,
SessionStatus: SessionStatusNew,
Target: outbound.Target,
Option: OptionData,
}
// Write frame to transport
// Copy data from link.Reader to transport (with framing)
// Copy data from transport to link.Writer (with deframing)
}Серверная сторона
ServerWorker
Демультиплексирует входящие кадры и диспетчеризует каждую сессию:
type ServerWorker struct {
dispatcher routing.Dispatcher
link *transport.Link
sessionManager *SessionManager
}
func NewServerWorker(ctx, dispatcher, link) (*ServerWorker, error) {
// Start worker goroutine
go worker.run(ctx)
return worker, nil
}Цикл обработки кадров
func (w *ServerWorker) run(ctx) {
for {
// Read frame metadata
meta := new(FrameMetadata)
meta.Unmarshal(reader)
switch meta.SessionStatus {
case SessionStatusNew:
// Create new session
// Dispatch via routing
s := w.sessionManager.Add(meta.SessionID)
link := dispatcher.Dispatch(ctx, meta.Target)
// Start copying between session and link
case SessionStatusKeep:
// Write data to existing session
s := w.sessionManager.Get(meta.SessionID)
// Read data payload and write to session
case SessionStatusEnd:
// Close session
w.sessionManager.Remove(meta.SessionID)
case SessionStatusKeepAlive:
// No-op, just prevents connection timeout
}
}
}Менеджер сессий
type SessionManager struct {
sessions map[uint16]*Session
count int
closed bool
}
type Session struct {
input buf.Reader // data from transport
output buf.Writer // data to transport
parent *SessionManager
ID uint16
transferType protocol.TransferType
}Кадрирование данных
При записи данных для существующей сессии:
// Writer wraps each write in a mux frame:
frame = [2B meta_len][2B session_id][status=Keep][option=Data]
data = [2B data_len][payload]При чтении reader разбирает кадры и направляет данные в соответствующую сессию.
Конфигурация Mux
{
"outbounds": [{
"mux": {
"enabled": true,
"concurrency": 8, // max streams per connection
"xudpConcurrency": 16,
"xudpProxyUDP443": "reject"
}
}]
}Замечания по реализации
Пространство ID сессий: 16-битное, максимум 65535 сессий на соединение. На практике параллельность ограничена ~128 для производительности.
Накладные расходы кадрирования: Каждый кадр данных добавляет 6-8 байт метаданных. Для небольших UDP-пакетов эти накладные расходы значительны.
Блокировка head-of-line: Все сессии разделяют одно TCP-соединение. Если одна сессия зависает (например, ретрансмиссия), все сессии затрагиваются. Это фундаментальный компромисс мультиплексирования.
KeepAlive: Кадр
SessionStatusKeepAliveпредотвращает закрытие транспортного соединения по тайм-ауту бездействия. Отправляется периодически при отсутствии потока данных.Жизненный цикл воркеров: Воркеры создаются по запросу и очищаются периодически (каждые 30 секунд). Воркер закрывается, когда все его сессии завершаются и новые не появляются.
Обнаружение XUDP: ID сессии 0 + сеть UDP указывает на режим XUDP. Подробнее см. Протокол XUDP.