Skip to content

Транспорт mKCP

Введение

mKCP (модифицированный KCP) -- это надежный UDP-транспорт, основанный на протоколе KCP (автор skywind3000). Он обеспечивает надежную, упорядоченную доставку поверх UDP с настраиваемым управлением перегрузками, жертвуя более высоким потреблением полосы пропускания ради снижения задержки. mKCP полезен в средах, где TCP работает плохо (высокие потери пакетов, высокая задержка), но UDP доступен. Он поддерживает опциональную обертку TLS и обфускацию заголовков пакетов.

Регистрация протокола

Зарегистрирован как "mkcp" (transport/internet/kcp/kcp.go:9):

go
const protocolName = "mkcp"
  • Дайлер: kcp/dialer.go:95-97
  • Слушатель: kcp/listener.go:176-178
  • Конфигурация: kcp/config.go:80-84

Обзор архитектуры

mermaid
flowchart TD
    subgraph "Уровень приложения"
        APP[Чтение/Запись приложения]
    end

    subgraph "Соединение KCP"
        RW[ReceivingWorker / SendingWorker]
        RTT[RoundTripInfo]
        UPD[Updater - периодический сброс]
    end

    subgraph "Уровень сегментов"
        DS[DataSegment]
        AS[AckSegment]
        CS[CmdOnlySegment]
    end

    subgraph "Вывод"
        SW[SegmentWriter]
        RSW[RetryableWriter]
    end

    subgraph "Сеть"
        UDP[UDP-сокет / PacketConn]
    end

    APP --> RW
    RW --> UPD
    UPD -->|flush| RW
    RW --> DS & AS & CS
    DS & AS & CS --> SW --> RSW --> UDP
    UDP --> |fetchInput| RW
    RTT -->|timeout/rto| RW

Процесс Dial

DialKCP (kcp/dialer.go:48-93) создает KCP-соединение поверх UDP-сокета:

go
func DialKCP(ctx context.Context, dest net.Destination,
    streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
    dest.Network = net.Network_UDP  // Принудительно UDP
    rawConn, _ := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)

    // Опциональная UDP-маска
    if streamSettings.UdpmaskManager != nil {
        wrapper := rawConn.(*internet.PacketConnWrapper)
        wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
    }

    kcpSettings := streamSettings.ProtocolSettings.(*Config)
    conv := uint16(atomic.AddUint32(&globalConv, 1))
    session := NewConnection(ConnMetadata{
        LocalAddr:    rawConn.LocalAddr(),
        RemoteAddr:   rawConn.RemoteAddr(),
        Conversation: conv,
    }, rawConn, rawConn, kcpSettings)

    go fetchInput(ctx, rawConn, reader, session)

    // Опциональный TLS
    if config := tls.ConfigFromStreamSettings(streamSettings); config != nil {
        return tls.Client(session, config.GetTLSConfig(tls.WithDestination(dest))), nil
    }
    return session, nil
}

Горутина fetchInput (kcp/dialer.go:20-45) непрерывно читает из UDP-сокета и передает сегменты в KCP-соединение:

go
func fetchInput(_ context.Context, input io.Reader, reader PacketReader, conn *Connection) {
    cache := make(chan *buf.Buffer, 1024)
    go func() {
        for {
            payload := buf.New()
            payload.ReadFrom(input)
            cache <- payload
        }
    }()
    for payload := range cache {
        segments := reader.Read(payload.Bytes())
        conn.Input(segments)
    }
}

Процесс Listen

NewListener (kcp/listener.go:35-61) создает UDP-хаб и распределяет входящие пакеты по KCP-сессиям:

go
func NewListener(ctx context.Context, address net.Address, port net.Port,
    streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (*Listener, error) {
    hub, _ := udp.ListenUDP(ctx, address, port, streamSettings, udp.HubCapacity(1024))
    go l.handlePackets()
    return l, nil
}

Демультиплексирование сессий

OnReceive (kcp/listener.go:70-122) демультиплексирует входящие UDP-пакеты по KCP-сессиям по ключу {Удаленный адрес, Порт, ID разговора}:

go
func (l *Listener) OnReceive(payload *buf.Buffer, src net.Destination) {
    segments := l.reader.Read(payload.Bytes())
    conv := segments[0].Conversation()
    id := ConnectionID{Remote: src.Address, Port: src.Port, Conv: conv}

    conn, found := l.sessions[id]
    if !found {
        // Создание нового KCP-соединения
        conn = NewConnection(ConnMetadata{...}, writer, writer, l.config)
        l.addConn(netConn)  // с опциональной TLS-оберткой
        l.sessions[id] = conn
    }
    conn.Input(segments)
}

Формат сегментов на проводе

Все сегменты имеют общий 4-байтный заголовок (kcp/segment.go:276-303):

+--+--+--+--+
|  Conv  |Cmd|Opt|
+--+--+--+--+
  • Conv (2 байта, big-endian): Идентификатор разговора
  • Cmd (1 байт): Тип команды
  • Opt (1 байт): Опции сегмента (бит 0 = Close)

DataSegment (Cmd=1)

Смещение  Размер  Поле
0       2     Conv
2       1     Cmd (0x01)
3       1     Option
4       4     Timestamp
8       4     Number (номер последовательности)
12      4     SendingNext
16      2     DataLen
18      N     Payload

Общие накладные расходы: 18 байт на сегмент данных (DataSegmentOverhead = 18).

AckSegment (Cmd=0)

Смещение  Размер  Поле
0       2     Conv
2       1     Cmd (0x00)
3       1     Option
4       4     ReceivingWindow
8       4     ReceivingNext
12      4     Timestamp
16      1     Count
17      4*N   NumberList (подтвержденные номера последовательностей)

До 128 номеров ACK на сегмент (ackNumberLimit = 128).

CmdOnlySegment (Cmd=2,3)

Смещение  Размер  Поле
0       2     Conv
2       1     Cmd (0x02=Terminate, 0x03=Ping)
3       1     Option
4       4     SendingNext
8       4     ReceivingNext
12      4     PeerRTO

Фиксированные 16 байт. Используется для keep-alive пингов и завершения соединения.

Соединение KCP

Конечный автомат

Connection (kcp/connection.go:179-204) поддерживает конечный автомат с 6 состояниями:

mermaid
stateDiagram-v2
    [*] --> Active
    Active --> ReadyToClose: Close()
    Active --> PeerClosed: узел отправил Close
    Active --> PeerTerminating: узел отправил Terminate

    ReadyToClose --> Terminating: буфер отправки пуст ИЛИ узел Close/Terminate
    PeerClosed --> Terminating: Close()
    PeerTerminating --> Terminating: таймаут (4 с)
    PeerTerminating --> Terminated: Close()

    Terminating --> Terminated: таймаут (8 с)
    ReadyToClose --> Terminating: таймаут (15 с)

    Terminated --> [*]: Terminate()

Система Updater

Две горутины Updater управляют соединением (kcp/connection.go:123-170):

  1. dataUpdater: Запускается с интервалом TTI, когда необходима работа с отправкой/получением. Сбрасывает ACK и повторно передает данные.
  2. pingUpdater: Запускается каждые 5 секунд. Отправляет keep-alive пинги и обрабатывает таймауты.

Оба используют механизм пробуждения на основе семафора для избежания ненужных горутин (connection.go:142-148):

go
func (u *Updater) WakeUp() {
    select {
    case <-u.notifier.Wait():
        go u.run()
    default:
    }
}

Отслеживание времени обхода (RTT)

RoundTripInfo (kcp/connection.go:52-121) реализует оценку RTT согласно RFC 6298:

go
func (info *RoundTripInfo) Update(rtt uint32, current uint32) {
    // SRTT = 7/8 * SRTT + 1/8 * RTT
    // RTTVAR = 3/4 * RTTVAR + 1/4 * |SRTT - RTT|
    // RTO = SRTT + max(4*RTTVAR, minRTT)
    // RTO ограничен 10 секундами, затем умножается на 5/4
}

Обработка входящих данных

Connection.Input (kcp/connection.go:560-605) обрабатывает входящие сегменты по типу:

go
func (c *Connection) Input(segments []Segment) {
    for _, seg := range segments {
        switch seg := seg.(type) {
        case *DataSegment:
            c.receivingWorker.ProcessSegment(seg)
            c.dataInput.Signal()      // пробуждение Read()
        case *AckSegment:
            c.sendingWorker.ProcessSegment(current, seg, c.roundTrip.Timeout())
            c.dataOutput.Signal()     // пробуждение Write()
        case *CmdOnlySegment:
            // Обработка Terminate, обновление RTO узла
        }
    }
}

Цикл сброса

flush (kcp/connection.go:607-644) вызывается периодически updater'ами:

  1. Закрытие бездействующих соединений (таймаут 30 с) (connection.go:613-615)
  2. Переход в ReadyToClose, когда буфер отправки пуст (connection.go:616-618)
  3. Отправка CommandTerminate в состоянии Terminating (connection.go:620-628)
  4. Сброс списка ACK (connection.go:638)
  5. Сброс окна отправки (повторная передача/отправка новых) (connection.go:639)
  6. Отправка keep-alive пинга каждые 3 секунды (connection.go:641-643)

Конфигурация

Из kcp/config.go:

ПараметрПо умолчаниюОписание
MTU1350Максимальный размер передаваемого блока (байты)
TTI50Интервал передачи (мс)
UplinkCapacity5Пропускная способность выгрузки (МБ/с)
DownlinkCapacity20Пропускная способность загрузки (МБ/с)
WriteBuffer2 МБРазмер буфера записи

Производные значения:

  • MSS (Maximum Segment Size): MTU - DataSegmentOverhead = 1350 - 18 = 1332 байт
  • SendingInFlightSize: UplinkCapacity * 1024 * 1024 / MTU / (1000 / TTI), минимум 8
  • SendingBufferSize: WriteBuffer / MTU
  • ReceivingInFlightSize: DownlinkCapacity * 1024 * 1024 / MTU / (1000 / TTI), минимум 8

Уровень вывода

SegmentWriter (kcp/output.go:12-13) сериализует сегменты и записывает в сеть:

go
type SegmentWriter interface {
    Write(seg Segment) error
}

SimpleSegmentWriter (output.go:15-37) сериализует в буфер и записывает:

go
func (w *SimpleSegmentWriter) Write(seg Segment) error {
    w.buffer.Clear()
    rawBytes := w.buffer.Extend(seg.ByteSize())
    seg.Serialize(rawBytes)
    _, err := w.writer.Write(w.buffer.Bytes())
    return err
}

RetryableWriter (output.go:39-53) оборачивает записи с повторными попытками до 5 раз с интервалом 100 мс.

Чтение пакетов

KCPPacketReader (kcp/io.go:7-20) разбирает байтовый буфер на несколько сегментов:

go
func (r *KCPPacketReader) Read(b []byte) []Segment {
    var result []Segment
    for len(b) > 0 {
        seg, x := ReadSegment(b)
        if seg == nil { break }
        result = append(result, seg)
        b = x
    }
    return result
}

ReadSegment (kcp/segment.go:276-303) разбирает 4-байтный заголовок для определения типа сегмента и делегирует соответствующему методу parse.

Поддержка UDP-маски

mKCP поддерживает обфускацию пакетов через UdpmaskManager (kcp/dialer.go:57-71):

go
if streamSettings.UdpmaskManager != nil {
    wrapper := rawConn.(*internet.PacketConnWrapper)
    wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}

Это оборачивает PacketConn для трансформации UDP-пакетов, затрудняя их идентификацию через DPI.

Примечания по реализации

  • Без FEC: В отличие от оригинальной реализации KCP от xtaci, mKCP в Xray не реализует прямую коррекцию ошибок (FEC). Надежность обеспечивается исключительно через повторную передачу.
  • Идентификатор разговора: 16-битное значение, глобально инкрементируемое для каждого соединения (dialer.go:18). Это ограничивает одновременные исходящие соединения примерно 65 тысячами до переполнения.
  • Таймаут бездействия 30 секунд: Соединения без входящих данных в течение 30 секунд закрываются (connection.go:613-615).
  • Модель горутин: Каждое соединение имеет 2 горутины updater (data + ping), плюс горутину fetchInput для чтения. Updater'ы запускаются лениво через семафор (WakeUp).
  • Владение буферами: DataSegment'ы владеют своими буферами полезной нагрузки. Сегменты должны быть освобождены через Release() после обработки для возврата буферов в пул.
  • Отслеживание сессий Writer: Структура Writer слушателя (listener.go:156-170) отслеживает ConnectionID, чтобы удалить сессию из слушателя при закрытии.
  • TLS поверх KCP: Опциональная TLS-обертка работает поверх надежного KCP-потока, а не поверх сырого UDP. Это означает, что TLS видит упорядоченный байтовый поток.
  • Без REALITY: mKCP не поддерживает REALITY (только стандартный TLS), поскольку REALITY требует манипуляции TLS 1.3 ClientHello, что предполагает TCP-транспорт.

Технический анализ для целей повторной реализации.