Skip to content

mKCP 传输

简介

mKCP(modified KCP)是一种基于 UDP 的可靠传输协议,灵感来自 skywind3000 的 KCP 协议。它在 UDP 之上提供可靠、有序的数据传递,具有可配置的拥塞控制,以更高的带宽消耗换取更低的延迟。mKCP 适用于 TCP 性能较差(高丢包、高延迟)但 UDP 可用的环境。它支持可选的 TLS 包装和数据包头部伪装。

协议注册

"mkcp" 名称注册(transport/internet/kcp/kcp.go:9):

go
const protocolName = "mkcp"
  • 拨号器kcp/dialer.go:95-97
  • 监听器kcp/listener.go:176-178
  • 配置kcp/config.go:80-84

架构概述

mermaid
flowchart TD
    subgraph "应用层"
        APP[应用程序读/写]
    end

    subgraph "KCP 连接"
        RW[ReceivingWorker / SendingWorker]
        RTT[RoundTripInfo]
        UPD[Updater - 定期刷新]
    end

    subgraph "分段层"
        DS[DataSegment]
        AS[AckSegment]
        CS[CmdOnlySegment]
    end

    subgraph "输出"
        SW[SegmentWriter]
        RSW[RetryableWriter]
    end

    subgraph "网络"
        UDP[UDP 套接字 / PacketConn]
    end

    APP --> RW
    RW --> UPD
    UPD -->|flush| RW
    RW --> DS & AS & CS
    DS & AS & CS --> SW --> RSW --> UDP
    UDP --> |fetchInput| RW
    RTT -->|timeout/rto| RW

拨号流程

DialKCPkcp/dialer.go:48-93)通过 UDP 套接字创建 KCP 连接:

go
func DialKCP(ctx context.Context, dest net.Destination,
    streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
    dest.Network = net.Network_UDP  // 强制使用 UDP
    rawConn, _ := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)

    // 可选的 UDP 掩码
    if streamSettings.UdpmaskManager != nil {
        wrapper := rawConn.(*internet.PacketConnWrapper)
        wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
    }

    kcpSettings := streamSettings.ProtocolSettings.(*Config)
    conv := uint16(atomic.AddUint32(&globalConv, 1))
    session := NewConnection(ConnMetadata{
        LocalAddr:    rawConn.LocalAddr(),
        RemoteAddr:   rawConn.RemoteAddr(),
        Conversation: conv,
    }, rawConn, rawConn, kcpSettings)

    go fetchInput(ctx, rawConn, reader, session)

    // 可选的 TLS
    if config := tls.ConfigFromStreamSettings(streamSettings); config != nil {
        return tls.Client(session, config.GetTLSConfig(tls.WithDestination(dest))), nil
    }
    return session, nil
}

fetchInput 协程(kcp/dialer.go:20-45)持续从 UDP 套接字读取并将分段提供给 KCP 连接:

go
func fetchInput(_ context.Context, input io.Reader, reader PacketReader, conn *Connection) {
    cache := make(chan *buf.Buffer, 1024)
    go func() {
        for {
            payload := buf.New()
            payload.ReadFrom(input)
            cache <- payload
        }
    }()
    for payload := range cache {
        segments := reader.Read(payload.Bytes())
        conn.Input(segments)
    }
}

监听流程

NewListenerkcp/listener.go:35-61)创建 UDP hub 并将传入数据包分发到 KCP 会话:

go
func NewListener(ctx context.Context, address net.Address, port net.Port,
    streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (*Listener, error) {
    hub, _ := udp.ListenUDP(ctx, address, port, streamSettings, udp.HubCapacity(1024))
    go l.handlePackets()
    return l, nil
}

会话解复用

OnReceivekcp/listener.go:70-122)通过 {远程地址, 端口, 会话 ID} 将传入的 UDP 数据包解复用到 KCP 会话:

go
func (l *Listener) OnReceive(payload *buf.Buffer, src net.Destination) {
    segments := l.reader.Read(payload.Bytes())
    conv := segments[0].Conversation()
    id := ConnectionID{Remote: src.Address, Port: src.Port, Conv: conv}

    conn, found := l.sessions[id]
    if !found {
        // 创建新的 KCP 连接
        conn = NewConnection(ConnMetadata{...}, writer, writer, l.config)
        l.addConn(netConn)  // 可选的 TLS 包装
        l.sessions[id] = conn
    }
    conn.Input(segments)
}

分段线格式

所有分段共享 4 字节头部(kcp/segment.go:276-303):

+--+--+--+--+
|  Conv  |Cmd|Opt|
+--+--+--+--+
  • Conv(2 字节,大端序):会话 ID
  • Cmd(1 字节):命令类型
  • Opt(1 字节):分段选项(第 0 位 = 关闭)

DataSegment(Cmd=1)

偏移    大小  字段
0       2     Conv
2       1     Cmd (0x01)
3       1     Option
4       4     Timestamp
8       4     Number(序列号)
12      4     SendingNext
16      2     DataLen
18      N     Payload

每个数据分段总开销:18 字节(DataSegmentOverhead = 18)。

AckSegment(Cmd=0)

偏移    大小  字段
0       2     Conv
2       1     Cmd (0x00)
3       1     Option
4       4     ReceivingWindow
8       4     ReceivingNext
12      4     Timestamp
16      1     Count
17      4*N   NumberList(已确认的序列号)

每个分段最多 128 个 ACK 号(ackNumberLimit = 128)。

CmdOnlySegment(Cmd=2,3)

偏移    大小  字段
0       2     Conv
2       1     Cmd (0x02=终止, 0x03=Ping)
3       1     Option
4       4     SendingNext
8       4     ReceivingNext
12      4     PeerRTO

固定 16 字节。用于 keep-alive ping 和连接终止。

KCP 连接

状态机

Connectionkcp/connection.go:179-204)维护一个包含 6 种状态的状态机:

mermaid
stateDiagram-v2
    [*] --> Active
    Active --> ReadyToClose: Close()
    Active --> PeerClosed: 对端发送 Close
    Active --> PeerTerminating: 对端发送 Terminate

    ReadyToClose --> Terminating: 发送缓冲区为空 或 对端 Close/Terminate
    PeerClosed --> Terminating: Close()
    PeerTerminating --> Terminating: 超时 (4s)
    PeerTerminating --> Terminated: Close()

    Terminating --> Terminated: 超时 (8s)
    ReadyToClose --> Terminating: 超时 (15s)

    Terminated --> [*]: Terminate()

Updater 系统

两个 Updater 协程驱动连接(kcp/connection.go:123-170):

  1. dataUpdater:当有发送/接收工作时以 TTI 间隔运行。刷新 ACK 并重传数据。
  2. pingUpdater:每 5 秒运行。发送 keep-alive ping 并处理超时。

两者均使用基于信号量的唤醒机制以避免不必要的协程(connection.go:142-148):

go
func (u *Updater) WakeUp() {
    select {
    case <-u.notifier.Wait():
        go u.run()
    default:
    }
}

往返时间跟踪

RoundTripInfokcp/connection.go:52-121)实现 RFC 6298 RTT 估算:

go
func (info *RoundTripInfo) Update(rtt uint32, current uint32) {
    // SRTT = 7/8 * SRTT + 1/8 * RTT
    // RTTVAR = 3/4 * RTTVAR + 1/4 * |SRTT - RTT|
    // RTO = SRTT + max(4*RTTVAR, minRTT)
    // RTO 上限为 10 秒,然后乘以 5/4
}

输入处理

Connection.Inputkcp/connection.go:560-605)按类型处理传入分段:

go
func (c *Connection) Input(segments []Segment) {
    for _, seg := range segments {
        switch seg := seg.(type) {
        case *DataSegment:
            c.receivingWorker.ProcessSegment(seg)
            c.dataInput.Signal()      // 唤醒 Read()
        case *AckSegment:
            c.sendingWorker.ProcessSegment(current, seg, c.roundTrip.Timeout())
            c.dataOutput.Signal()     // 唤醒 Write()
        case *CmdOnlySegment:
            // 处理 Terminate,更新对端 RTO
        }
    }
}

刷新周期

flushkcp/connection.go:607-644)由 updater 定期调用:

  1. 关闭空闲连接(30 秒超时)(connection.go:613-615
  2. 发送缓冲区为空时转换为 ReadyToCloseconnection.go:616-618
  3. Terminating 状态下发送 CommandTerminateconnection.go:620-628
  4. 刷新 ACK 列表(connection.go:638
  5. 刷新发送窗口(重传/发送新数据)(connection.go:639
  6. 每 3 秒发送 keep-alive ping(connection.go:641-643

配置

来自 kcp/config.go

参数默认值描述
MTU1350最大传输单元(字节)
TTI50传输时间间隔(毫秒)
UplinkCapacity5上传带宽(MB/s)
DownlinkCapacity20下载带宽(MB/s)
WriteBuffer2 MB写缓冲区大小

派生值:

  • MSS(最大分段大小):MTU - DataSegmentOverhead = 1350 - 18 = 1332 字节
  • SendingInFlightSizeUplinkCapacity * 1024 * 1024 / MTU / (1000 / TTI),最小值 8
  • SendingBufferSizeWriteBuffer / MTU
  • ReceivingInFlightSizeDownlinkCapacity * 1024 * 1024 / MTU / (1000 / TTI),最小值 8

输出层

SegmentWriterkcp/output.go:12-13)序列化分段并写入网络:

go
type SegmentWriter interface {
    Write(seg Segment) error
}

SimpleSegmentWriteroutput.go:15-37)序列化到缓冲区后写入:

go
func (w *SimpleSegmentWriter) Write(seg Segment) error {
    w.buffer.Clear()
    rawBytes := w.buffer.Extend(seg.ByteSize())
    seg.Serialize(rawBytes)
    _, err := w.writer.Write(w.buffer.Bytes())
    return err
}

RetryableWriteroutput.go:39-53)以 100ms 间隔最多重试 5 次。

数据包读取

KCPPacketReaderkcp/io.go:7-20)将字节缓冲区解析为多个分段:

go
func (r *KCPPacketReader) Read(b []byte) []Segment {
    var result []Segment
    for len(b) > 0 {
        seg, x := ReadSegment(b)
        if seg == nil { break }
        result = append(result, seg)
        b = x
    }
    return result
}

ReadSegmentkcp/segment.go:276-303)解析 4 字节头部以确定分段类型,并委托给相应的 parse 方法。

UDP 掩码支持

mKCP 通过 UdpmaskManagerkcp/dialer.go:57-71)支持数据包混淆:

go
if streamSettings.UdpmaskManager != nil {
    wrapper := rawConn.(*internet.PacketConnWrapper)
    wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}

这包装 PacketConn 对 UDP 数据包进行变换,使其更难通过 DPI 识别。

实现说明

  • 无 FEC:与 xtaci 的原始 KCP 实现不同,Xray 的 mKCP 不实现前向纠错(FEC)。可靠性完全依靠重传。
  • 会话 ID:一个 16 位值,每次连接全局递增(dialer.go:18)。这将并发出站连接限制在约 65K 个后循环。
  • 30 秒空闲超时:30 秒内无传入数据的连接将被关闭(connection.go:613-615)。
  • 协程模型:每个连接有 2 个 updater 协程(data + ping),加上一个 fetchInput 协程用于读取。updater 通过信号量懒启动(WakeUp)。
  • 缓冲区所有权:DataSegment 拥有其负载缓冲区。分段处理后必须调用 Release() 将缓冲区返回池中。
  • Writer 会话跟踪:监听器的 Writer 结构体(listener.go:156-170)跟踪 ConnectionID,以便在关闭时从监听器中移除会话。
  • TLS over KCP:可选的 TLS 包装运行在 KCP 可靠流之上,而非原始 UDP 之上。这意味着 TLS 看到的是有序字节流。
  • 不支持 REALITY:mKCP 不支持 REALITY(仅支持标准 TLS),因为 REALITY 需要基于 TCP 传输的 TLS 1.3 ClientHello 操作。

用于重新实现目的的技术分析。