Skip to content

Мультиплексирование Mux

Mux (мультиплексирование) позволяет нескольким логическим потокам совместно использовать одно транспортное соединение. Это сокращает задержку установки соединения и накладные расходы, особенно при транспортах с высокой задержкой.

Исходный код: common/mux/

Архитектура

mermaid
flowchart LR
    subgraph Клиент
        S1[Поток 1]
        S2[Поток 2]
        S3[Поток 3]
        CM[ClientManager]
        CW[ClientWorker]
    end

    subgraph Transport["Единое транспортное соединение"]
        Frames["Mux-кадры<br/>(чередующиеся)"]
    end

    subgraph Сервер
        SW[ServerWorker]
        D1[Диспетчеризация 1]
        D2[Диспетчеризация 2]
        D3[Диспетчеризация 3]
    end

    S1 --> CM
    S2 --> CM
    S3 --> CM
    CM --> CW
    CW --> Frames
    Frames --> SW
    SW --> D1
    SW --> D2
    SW --> D3

Формат кадра

+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
| Meta Len  | Session ID | Status | Option |
| (2B BE)   | (2B BE)    | (1B)   | (1B)   |
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+

For SessionStatusNew:
+--+--+--+--+--+--+--+--+
| Network | Port  | Addr |
| (1B)    | (2B)  | (var)|
+--+--+--+--+--+--+--+--+

Optional (XUDP): [8B GlobalID]

Data payload:
+--+--+--+--+
| Data Len  |  (2B BE, followed by payload bytes)
+--+--+--+--+

Длина метаданных

Первые 2 байта кодируют длину следующих за ними метаданных (ID сессии + статус + опции + адрес назначения).

ID сессии

2-байтовый идентификатор сессии. Каждый логический поток получает уникальный ID в рамках соединения. ID=0 с сетью UDP используется для обнаружения XUDP.

Статус сессии

ЗначениеНазваниеОписание
0x01NewНовая сессия (включает адрес назначения)
0x02KeepПродолжение существующей сессии (далее следуют данные)
0x03EndСессия закрыта
0x04KeepAliveПоддержание соединения (без данных)

Флаги опций

БитНазваниеОписание
0x01OptionDataКадр содержит полезную нагрузку
0x02OptionErrorСессия завершилась с ошибкой

Целевая сеть

ЗначениеСеть
0x01TCP
0x02UDP

Формат адреса

Port:     2 bytes big-endian
AddrType: 0x01=IPv4, 0x02=Domain, 0x03=IPv6
Address:  4B (IPv4), 1B+NB (domain len+domain), 16B (IPv6)

Клиентская сторона

ClientManager

Точка входа для mux на стороне исходящего соединения:

go
type ClientManager struct {
    Enabled bool
    Picker  WorkerPicker
}

func (m *ClientManager) Dispatch(ctx, link) error {
    for i := 0; i < 16; i++ {
        worker, _ := m.Picker.PickAvailable()
        if worker.Dispatch(ctx, link) {
            return nil
        }
    }
    return errors.New("unable to find an available mux client")
}

ClientWorker

Каждый ClientWorker управляет одним транспортным соединением с несколькими сессиями:

go
type ClientWorker struct {
    sessionManager *SessionManager  // tracks active sessions
    link           transport.Link   // transport connection
    done           *done.Instance
}

Воркер считается «заполненным», когда достигнуто максимальное количество параллельных сессий (настраиваемое, по умолчанию ~128).

IncrementalWorkerPicker

Создаёт новых воркеров по запросу:

go
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
    p.access.Lock()
    defer p.access.Unlock()

    idx := p.findAvailable()
    if idx >= 0 {
        return p.workers[idx], nil
    }

    // All workers full: create a new one
    worker, err := p.Factory.Create()
    p.workers = append(p.workers, worker)
    return worker, nil
}

ClientWorker.Dispatch

go
func (m *ClientWorker) Dispatch(ctx, link) bool {
    if m.IsFull() || m.Closed() {
        return false
    }

    // Allocate session
    s := m.sessionManager.Allocate()

    // Build frame metadata
    meta := FrameMetadata{
        SessionID:     s.ID,
        SessionStatus: SessionStatusNew,
        Target:        outbound.Target,
        Option:        OptionData,
    }

    // Write frame to transport
    // Copy data from link.Reader to transport (with framing)
    // Copy data from transport to link.Writer (with deframing)
}

Серверная сторона

ServerWorker

Демультиплексирует входящие кадры и диспетчеризует каждую сессию:

go
type ServerWorker struct {
    dispatcher     routing.Dispatcher
    link           *transport.Link
    sessionManager *SessionManager
}

func NewServerWorker(ctx, dispatcher, link) (*ServerWorker, error) {
    // Start worker goroutine
    go worker.run(ctx)
    return worker, nil
}

Цикл обработки кадров

go
func (w *ServerWorker) run(ctx) {
    for {
        // Read frame metadata
        meta := new(FrameMetadata)
        meta.Unmarshal(reader)

        switch meta.SessionStatus {
        case SessionStatusNew:
            // Create new session
            // Dispatch via routing
            s := w.sessionManager.Add(meta.SessionID)
            link := dispatcher.Dispatch(ctx, meta.Target)
            // Start copying between session and link

        case SessionStatusKeep:
            // Write data to existing session
            s := w.sessionManager.Get(meta.SessionID)
            // Read data payload and write to session

        case SessionStatusEnd:
            // Close session
            w.sessionManager.Remove(meta.SessionID)

        case SessionStatusKeepAlive:
            // No-op, just prevents connection timeout
        }
    }
}

Менеджер сессий

go
type SessionManager struct {
    sessions map[uint16]*Session
    count    int
    closed   bool
}

type Session struct {
    input        buf.Reader    // data from transport
    output       buf.Writer    // data to transport
    parent       *SessionManager
    ID           uint16
    transferType protocol.TransferType
}

Кадрирование данных

При записи данных для существующей сессии:

go
// Writer wraps each write in a mux frame:
frame = [2B meta_len][2B session_id][status=Keep][option=Data]
data  = [2B data_len][payload]

При чтении reader разбирает кадры и направляет данные в соответствующую сессию.

Конфигурация Mux

json
{
  "outbounds": [{
    "mux": {
      "enabled": true,
      "concurrency": 8,    // max streams per connection
      "xudpConcurrency": 16,
      "xudpProxyUDP443": "reject"
    }
  }]
}

Замечания по реализации

  1. Пространство ID сессий: 16-битное, максимум 65535 сессий на соединение. На практике параллельность ограничена ~128 для производительности.

  2. Накладные расходы кадрирования: Каждый кадр данных добавляет 6-8 байт метаданных. Для небольших UDP-пакетов эти накладные расходы значительны.

  3. Блокировка head-of-line: Все сессии разделяют одно TCP-соединение. Если одна сессия зависает (например, ретрансмиссия), все сессии затрагиваются. Это фундаментальный компромисс мультиплексирования.

  4. KeepAlive: Кадр SessionStatusKeepAlive предотвращает закрытие транспортного соединения по тайм-ауту бездействия. Отправляется периодически при отсутствии потока данных.

  5. Жизненный цикл воркеров: Воркеры создаются по запросу и очищаются периодически (каждые 30 секунд). Воркер закрывается, когда все его сессии завершаются и новые не появляются.

  6. Обнаружение XUDP: ID сессии 0 + сеть UDP указывает на режим XUDP. Подробнее см. Протокол XUDP.

Технический анализ для целей повторной реализации.