Транспорт mKCP
Введение
mKCP (модифицированный KCP) -- это надежный UDP-транспорт, основанный на протоколе KCP (автор skywind3000). Он обеспечивает надежную, упорядоченную доставку поверх UDP с настраиваемым управлением перегрузками, жертвуя более высоким потреблением полосы пропускания ради снижения задержки. mKCP полезен в средах, где TCP работает плохо (высокие потери пакетов, высокая задержка), но UDP доступен. Он поддерживает опциональную обертку TLS и обфускацию заголовков пакетов.
Регистрация протокола
Зарегистрирован как "mkcp" (transport/internet/kcp/kcp.go:9):
const protocolName = "mkcp"- Дайлер:
kcp/dialer.go:95-97 - Слушатель:
kcp/listener.go:176-178 - Конфигурация:
kcp/config.go:80-84
Обзор архитектуры
flowchart TD
subgraph "Уровень приложения"
APP[Чтение/Запись приложения]
end
subgraph "Соединение KCP"
RW[ReceivingWorker / SendingWorker]
RTT[RoundTripInfo]
UPD[Updater - периодический сброс]
end
subgraph "Уровень сегментов"
DS[DataSegment]
AS[AckSegment]
CS[CmdOnlySegment]
end
subgraph "Вывод"
SW[SegmentWriter]
RSW[RetryableWriter]
end
subgraph "Сеть"
UDP[UDP-сокет / PacketConn]
end
APP --> RW
RW --> UPD
UPD -->|flush| RW
RW --> DS & AS & CS
DS & AS & CS --> SW --> RSW --> UDP
UDP --> |fetchInput| RW
RTT -->|timeout/rto| RWПроцесс Dial
DialKCP (kcp/dialer.go:48-93) создает KCP-соединение поверх UDP-сокета:
func DialKCP(ctx context.Context, dest net.Destination,
streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
dest.Network = net.Network_UDP // Принудительно UDP
rawConn, _ := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
// Опциональная UDP-маска
if streamSettings.UdpmaskManager != nil {
wrapper := rawConn.(*internet.PacketConnWrapper)
wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}
kcpSettings := streamSettings.ProtocolSettings.(*Config)
conv := uint16(atomic.AddUint32(&globalConv, 1))
session := NewConnection(ConnMetadata{
LocalAddr: rawConn.LocalAddr(),
RemoteAddr: rawConn.RemoteAddr(),
Conversation: conv,
}, rawConn, rawConn, kcpSettings)
go fetchInput(ctx, rawConn, reader, session)
// Опциональный TLS
if config := tls.ConfigFromStreamSettings(streamSettings); config != nil {
return tls.Client(session, config.GetTLSConfig(tls.WithDestination(dest))), nil
}
return session, nil
}Горутина fetchInput (kcp/dialer.go:20-45) непрерывно читает из UDP-сокета и передает сегменты в KCP-соединение:
func fetchInput(_ context.Context, input io.Reader, reader PacketReader, conn *Connection) {
cache := make(chan *buf.Buffer, 1024)
go func() {
for {
payload := buf.New()
payload.ReadFrom(input)
cache <- payload
}
}()
for payload := range cache {
segments := reader.Read(payload.Bytes())
conn.Input(segments)
}
}Процесс Listen
NewListener (kcp/listener.go:35-61) создает UDP-хаб и распределяет входящие пакеты по KCP-сессиям:
func NewListener(ctx context.Context, address net.Address, port net.Port,
streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (*Listener, error) {
hub, _ := udp.ListenUDP(ctx, address, port, streamSettings, udp.HubCapacity(1024))
go l.handlePackets()
return l, nil
}Демультиплексирование сессий
OnReceive (kcp/listener.go:70-122) демультиплексирует входящие UDP-пакеты по KCP-сессиям по ключу {Удаленный адрес, Порт, ID разговора}:
func (l *Listener) OnReceive(payload *buf.Buffer, src net.Destination) {
segments := l.reader.Read(payload.Bytes())
conv := segments[0].Conversation()
id := ConnectionID{Remote: src.Address, Port: src.Port, Conv: conv}
conn, found := l.sessions[id]
if !found {
// Создание нового KCP-соединения
conn = NewConnection(ConnMetadata{...}, writer, writer, l.config)
l.addConn(netConn) // с опциональной TLS-оберткой
l.sessions[id] = conn
}
conn.Input(segments)
}Формат сегментов на проводе
Все сегменты имеют общий 4-байтный заголовок (kcp/segment.go:276-303):
+--+--+--+--+
| Conv |Cmd|Opt|
+--+--+--+--+- Conv (2 байта, big-endian): Идентификатор разговора
- Cmd (1 байт): Тип команды
- Opt (1 байт): Опции сегмента (бит 0 = Close)
DataSegment (Cmd=1)
Смещение Размер Поле
0 2 Conv
2 1 Cmd (0x01)
3 1 Option
4 4 Timestamp
8 4 Number (номер последовательности)
12 4 SendingNext
16 2 DataLen
18 N PayloadОбщие накладные расходы: 18 байт на сегмент данных (DataSegmentOverhead = 18).
AckSegment (Cmd=0)
Смещение Размер Поле
0 2 Conv
2 1 Cmd (0x00)
3 1 Option
4 4 ReceivingWindow
8 4 ReceivingNext
12 4 Timestamp
16 1 Count
17 4*N NumberList (подтвержденные номера последовательностей)До 128 номеров ACK на сегмент (ackNumberLimit = 128).
CmdOnlySegment (Cmd=2,3)
Смещение Размер Поле
0 2 Conv
2 1 Cmd (0x02=Terminate, 0x03=Ping)
3 1 Option
4 4 SendingNext
8 4 ReceivingNext
12 4 PeerRTOФиксированные 16 байт. Используется для keep-alive пингов и завершения соединения.
Соединение KCP
Конечный автомат
Connection (kcp/connection.go:179-204) поддерживает конечный автомат с 6 состояниями:
stateDiagram-v2
[*] --> Active
Active --> ReadyToClose: Close()
Active --> PeerClosed: узел отправил Close
Active --> PeerTerminating: узел отправил Terminate
ReadyToClose --> Terminating: буфер отправки пуст ИЛИ узел Close/Terminate
PeerClosed --> Terminating: Close()
PeerTerminating --> Terminating: таймаут (4 с)
PeerTerminating --> Terminated: Close()
Terminating --> Terminated: таймаут (8 с)
ReadyToClose --> Terminating: таймаут (15 с)
Terminated --> [*]: Terminate()Система Updater
Две горутины Updater управляют соединением (kcp/connection.go:123-170):
- dataUpdater: Запускается с интервалом TTI, когда необходима работа с отправкой/получением. Сбрасывает ACK и повторно передает данные.
- pingUpdater: Запускается каждые 5 секунд. Отправляет keep-alive пинги и обрабатывает таймауты.
Оба используют механизм пробуждения на основе семафора для избежания ненужных горутин (connection.go:142-148):
func (u *Updater) WakeUp() {
select {
case <-u.notifier.Wait():
go u.run()
default:
}
}Отслеживание времени обхода (RTT)
RoundTripInfo (kcp/connection.go:52-121) реализует оценку RTT согласно RFC 6298:
func (info *RoundTripInfo) Update(rtt uint32, current uint32) {
// SRTT = 7/8 * SRTT + 1/8 * RTT
// RTTVAR = 3/4 * RTTVAR + 1/4 * |SRTT - RTT|
// RTO = SRTT + max(4*RTTVAR, minRTT)
// RTO ограничен 10 секундами, затем умножается на 5/4
}Обработка входящих данных
Connection.Input (kcp/connection.go:560-605) обрабатывает входящие сегменты по типу:
func (c *Connection) Input(segments []Segment) {
for _, seg := range segments {
switch seg := seg.(type) {
case *DataSegment:
c.receivingWorker.ProcessSegment(seg)
c.dataInput.Signal() // пробуждение Read()
case *AckSegment:
c.sendingWorker.ProcessSegment(current, seg, c.roundTrip.Timeout())
c.dataOutput.Signal() // пробуждение Write()
case *CmdOnlySegment:
// Обработка Terminate, обновление RTO узла
}
}
}Цикл сброса
flush (kcp/connection.go:607-644) вызывается периодически updater'ами:
- Закрытие бездействующих соединений (таймаут 30 с) (
connection.go:613-615) - Переход в
ReadyToClose, когда буфер отправки пуст (connection.go:616-618) - Отправка
CommandTerminateв состоянииTerminating(connection.go:620-628) - Сброс списка ACK (
connection.go:638) - Сброс окна отправки (повторная передача/отправка новых) (
connection.go:639) - Отправка keep-alive пинга каждые 3 секунды (
connection.go:641-643)
Конфигурация
Из kcp/config.go:
| Параметр | По умолчанию | Описание |
|---|---|---|
| MTU | 1350 | Максимальный размер передаваемого блока (байты) |
| TTI | 50 | Интервал передачи (мс) |
| UplinkCapacity | 5 | Пропускная способность выгрузки (МБ/с) |
| DownlinkCapacity | 20 | Пропускная способность загрузки (МБ/с) |
| WriteBuffer | 2 МБ | Размер буфера записи |
Производные значения:
- MSS (Maximum Segment Size):
MTU - DataSegmentOverhead= 1350 - 18 = 1332 байт - SendingInFlightSize:
UplinkCapacity * 1024 * 1024 / MTU / (1000 / TTI), минимум 8 - SendingBufferSize:
WriteBuffer / MTU - ReceivingInFlightSize:
DownlinkCapacity * 1024 * 1024 / MTU / (1000 / TTI), минимум 8
Уровень вывода
SegmentWriter (kcp/output.go:12-13) сериализует сегменты и записывает в сеть:
type SegmentWriter interface {
Write(seg Segment) error
}SimpleSegmentWriter (output.go:15-37) сериализует в буфер и записывает:
func (w *SimpleSegmentWriter) Write(seg Segment) error {
w.buffer.Clear()
rawBytes := w.buffer.Extend(seg.ByteSize())
seg.Serialize(rawBytes)
_, err := w.writer.Write(w.buffer.Bytes())
return err
}RetryableWriter (output.go:39-53) оборачивает записи с повторными попытками до 5 раз с интервалом 100 мс.
Чтение пакетов
KCPPacketReader (kcp/io.go:7-20) разбирает байтовый буфер на несколько сегментов:
func (r *KCPPacketReader) Read(b []byte) []Segment {
var result []Segment
for len(b) > 0 {
seg, x := ReadSegment(b)
if seg == nil { break }
result = append(result, seg)
b = x
}
return result
}ReadSegment (kcp/segment.go:276-303) разбирает 4-байтный заголовок для определения типа сегмента и делегирует соответствующему методу parse.
Поддержка UDP-маски
mKCP поддерживает обфускацию пакетов через UdpmaskManager (kcp/dialer.go:57-71):
if streamSettings.UdpmaskManager != nil {
wrapper := rawConn.(*internet.PacketConnWrapper)
wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}Это оборачивает PacketConn для трансформации UDP-пакетов, затрудняя их идентификацию через DPI.
Примечания по реализации
- Без FEC: В отличие от оригинальной реализации KCP от xtaci, mKCP в Xray не реализует прямую коррекцию ошибок (FEC). Надежность обеспечивается исключительно через повторную передачу.
- Идентификатор разговора: 16-битное значение, глобально инкрементируемое для каждого соединения (
dialer.go:18). Это ограничивает одновременные исходящие соединения примерно 65 тысячами до переполнения. - Таймаут бездействия 30 секунд: Соединения без входящих данных в течение 30 секунд закрываются (
connection.go:613-615). - Модель горутин: Каждое соединение имеет 2 горутины updater (data + ping), плюс горутину
fetchInputдля чтения. Updater'ы запускаются лениво через семафор (WakeUp). - Владение буферами: DataSegment'ы владеют своими буферами полезной нагрузки. Сегменты должны быть освобождены через
Release()после обработки для возврата буферов в пул. - Отслеживание сессий Writer: Структура
Writerслушателя (listener.go:156-170) отслеживаетConnectionID, чтобы удалить сессию из слушателя при закрытии. - TLS поверх KCP: Опциональная TLS-обертка работает поверх надежного KCP-потока, а не поверх сырого UDP. Это означает, что TLS видит упорядоченный байтовый поток.
- Без REALITY: mKCP не поддерживает REALITY (только стандартный TLS), поскольку REALITY требует манипуляции TLS 1.3 ClientHello, что предполагает TCP-транспорт.