mKCP 传输
简介
mKCP(modified KCP)是一种基于 UDP 的可靠传输协议,灵感来自 skywind3000 的 KCP 协议。它在 UDP 之上提供可靠、有序的数据传递,具有可配置的拥塞控制,以更高的带宽消耗换取更低的延迟。mKCP 适用于 TCP 性能较差(高丢包、高延迟)但 UDP 可用的环境。它支持可选的 TLS 包装和数据包头部伪装。
协议注册
以 "mkcp" 名称注册(transport/internet/kcp/kcp.go:9):
const protocolName = "mkcp"- 拨号器:
kcp/dialer.go:95-97 - 监听器:
kcp/listener.go:176-178 - 配置:
kcp/config.go:80-84
架构概述
flowchart TD
subgraph "应用层"
APP[应用程序读/写]
end
subgraph "KCP 连接"
RW[ReceivingWorker / SendingWorker]
RTT[RoundTripInfo]
UPD[Updater - 定期刷新]
end
subgraph "分段层"
DS[DataSegment]
AS[AckSegment]
CS[CmdOnlySegment]
end
subgraph "输出"
SW[SegmentWriter]
RSW[RetryableWriter]
end
subgraph "网络"
UDP[UDP 套接字 / PacketConn]
end
APP --> RW
RW --> UPD
UPD -->|flush| RW
RW --> DS & AS & CS
DS & AS & CS --> SW --> RSW --> UDP
UDP --> |fetchInput| RW
RTT -->|timeout/rto| RW拨号流程
DialKCP(kcp/dialer.go:48-93)通过 UDP 套接字创建 KCP 连接:
func DialKCP(ctx context.Context, dest net.Destination,
streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
dest.Network = net.Network_UDP // 强制使用 UDP
rawConn, _ := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
// 可选的 UDP 掩码
if streamSettings.UdpmaskManager != nil {
wrapper := rawConn.(*internet.PacketConnWrapper)
wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}
kcpSettings := streamSettings.ProtocolSettings.(*Config)
conv := uint16(atomic.AddUint32(&globalConv, 1))
session := NewConnection(ConnMetadata{
LocalAddr: rawConn.LocalAddr(),
RemoteAddr: rawConn.RemoteAddr(),
Conversation: conv,
}, rawConn, rawConn, kcpSettings)
go fetchInput(ctx, rawConn, reader, session)
// 可选的 TLS
if config := tls.ConfigFromStreamSettings(streamSettings); config != nil {
return tls.Client(session, config.GetTLSConfig(tls.WithDestination(dest))), nil
}
return session, nil
}fetchInput 协程(kcp/dialer.go:20-45)持续从 UDP 套接字读取并将分段提供给 KCP 连接:
func fetchInput(_ context.Context, input io.Reader, reader PacketReader, conn *Connection) {
cache := make(chan *buf.Buffer, 1024)
go func() {
for {
payload := buf.New()
payload.ReadFrom(input)
cache <- payload
}
}()
for payload := range cache {
segments := reader.Read(payload.Bytes())
conn.Input(segments)
}
}监听流程
NewListener(kcp/listener.go:35-61)创建 UDP hub 并将传入数据包分发到 KCP 会话:
func NewListener(ctx context.Context, address net.Address, port net.Port,
streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (*Listener, error) {
hub, _ := udp.ListenUDP(ctx, address, port, streamSettings, udp.HubCapacity(1024))
go l.handlePackets()
return l, nil
}会话解复用
OnReceive(kcp/listener.go:70-122)通过 {远程地址, 端口, 会话 ID} 将传入的 UDP 数据包解复用到 KCP 会话:
func (l *Listener) OnReceive(payload *buf.Buffer, src net.Destination) {
segments := l.reader.Read(payload.Bytes())
conv := segments[0].Conversation()
id := ConnectionID{Remote: src.Address, Port: src.Port, Conv: conv}
conn, found := l.sessions[id]
if !found {
// 创建新的 KCP 连接
conn = NewConnection(ConnMetadata{...}, writer, writer, l.config)
l.addConn(netConn) // 可选的 TLS 包装
l.sessions[id] = conn
}
conn.Input(segments)
}分段线格式
所有分段共享 4 字节头部(kcp/segment.go:276-303):
+--+--+--+--+
| Conv |Cmd|Opt|
+--+--+--+--+- Conv(2 字节,大端序):会话 ID
- Cmd(1 字节):命令类型
- Opt(1 字节):分段选项(第 0 位 = 关闭)
DataSegment(Cmd=1)
偏移 大小 字段
0 2 Conv
2 1 Cmd (0x01)
3 1 Option
4 4 Timestamp
8 4 Number(序列号)
12 4 SendingNext
16 2 DataLen
18 N Payload每个数据分段总开销:18 字节(DataSegmentOverhead = 18)。
AckSegment(Cmd=0)
偏移 大小 字段
0 2 Conv
2 1 Cmd (0x00)
3 1 Option
4 4 ReceivingWindow
8 4 ReceivingNext
12 4 Timestamp
16 1 Count
17 4*N NumberList(已确认的序列号)每个分段最多 128 个 ACK 号(ackNumberLimit = 128)。
CmdOnlySegment(Cmd=2,3)
偏移 大小 字段
0 2 Conv
2 1 Cmd (0x02=终止, 0x03=Ping)
3 1 Option
4 4 SendingNext
8 4 ReceivingNext
12 4 PeerRTO固定 16 字节。用于 keep-alive ping 和连接终止。
KCP 连接
状态机
Connection(kcp/connection.go:179-204)维护一个包含 6 种状态的状态机:
stateDiagram-v2
[*] --> Active
Active --> ReadyToClose: Close()
Active --> PeerClosed: 对端发送 Close
Active --> PeerTerminating: 对端发送 Terminate
ReadyToClose --> Terminating: 发送缓冲区为空 或 对端 Close/Terminate
PeerClosed --> Terminating: Close()
PeerTerminating --> Terminating: 超时 (4s)
PeerTerminating --> Terminated: Close()
Terminating --> Terminated: 超时 (8s)
ReadyToClose --> Terminating: 超时 (15s)
Terminated --> [*]: Terminate()Updater 系统
两个 Updater 协程驱动连接(kcp/connection.go:123-170):
- dataUpdater:当有发送/接收工作时以 TTI 间隔运行。刷新 ACK 并重传数据。
- pingUpdater:每 5 秒运行。发送 keep-alive ping 并处理超时。
两者均使用基于信号量的唤醒机制以避免不必要的协程(connection.go:142-148):
func (u *Updater) WakeUp() {
select {
case <-u.notifier.Wait():
go u.run()
default:
}
}往返时间跟踪
RoundTripInfo(kcp/connection.go:52-121)实现 RFC 6298 RTT 估算:
func (info *RoundTripInfo) Update(rtt uint32, current uint32) {
// SRTT = 7/8 * SRTT + 1/8 * RTT
// RTTVAR = 3/4 * RTTVAR + 1/4 * |SRTT - RTT|
// RTO = SRTT + max(4*RTTVAR, minRTT)
// RTO 上限为 10 秒,然后乘以 5/4
}输入处理
Connection.Input(kcp/connection.go:560-605)按类型处理传入分段:
func (c *Connection) Input(segments []Segment) {
for _, seg := range segments {
switch seg := seg.(type) {
case *DataSegment:
c.receivingWorker.ProcessSegment(seg)
c.dataInput.Signal() // 唤醒 Read()
case *AckSegment:
c.sendingWorker.ProcessSegment(current, seg, c.roundTrip.Timeout())
c.dataOutput.Signal() // 唤醒 Write()
case *CmdOnlySegment:
// 处理 Terminate,更新对端 RTO
}
}
}刷新周期
flush(kcp/connection.go:607-644)由 updater 定期调用:
- 关闭空闲连接(30 秒超时)(
connection.go:613-615) - 发送缓冲区为空时转换为
ReadyToClose(connection.go:616-618) - 在
Terminating状态下发送CommandTerminate(connection.go:620-628) - 刷新 ACK 列表(
connection.go:638) - 刷新发送窗口(重传/发送新数据)(
connection.go:639) - 每 3 秒发送 keep-alive ping(
connection.go:641-643)
配置
来自 kcp/config.go:
| 参数 | 默认值 | 描述 |
|---|---|---|
| MTU | 1350 | 最大传输单元(字节) |
| TTI | 50 | 传输时间间隔(毫秒) |
| UplinkCapacity | 5 | 上传带宽(MB/s) |
| DownlinkCapacity | 20 | 下载带宽(MB/s) |
| WriteBuffer | 2 MB | 写缓冲区大小 |
派生值:
- MSS(最大分段大小):
MTU - DataSegmentOverhead= 1350 - 18 = 1332 字节 - SendingInFlightSize:
UplinkCapacity * 1024 * 1024 / MTU / (1000 / TTI),最小值 8 - SendingBufferSize:
WriteBuffer / MTU - ReceivingInFlightSize:
DownlinkCapacity * 1024 * 1024 / MTU / (1000 / TTI),最小值 8
输出层
SegmentWriter(kcp/output.go:12-13)序列化分段并写入网络:
type SegmentWriter interface {
Write(seg Segment) error
}SimpleSegmentWriter(output.go:15-37)序列化到缓冲区后写入:
func (w *SimpleSegmentWriter) Write(seg Segment) error {
w.buffer.Clear()
rawBytes := w.buffer.Extend(seg.ByteSize())
seg.Serialize(rawBytes)
_, err := w.writer.Write(w.buffer.Bytes())
return err
}RetryableWriter(output.go:39-53)以 100ms 间隔最多重试 5 次。
数据包读取
KCPPacketReader(kcp/io.go:7-20)将字节缓冲区解析为多个分段:
func (r *KCPPacketReader) Read(b []byte) []Segment {
var result []Segment
for len(b) > 0 {
seg, x := ReadSegment(b)
if seg == nil { break }
result = append(result, seg)
b = x
}
return result
}ReadSegment(kcp/segment.go:276-303)解析 4 字节头部以确定分段类型,并委托给相应的 parse 方法。
UDP 掩码支持
mKCP 通过 UdpmaskManager(kcp/dialer.go:57-71)支持数据包混淆:
if streamSettings.UdpmaskManager != nil {
wrapper := rawConn.(*internet.PacketConnWrapper)
wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}这包装 PacketConn 对 UDP 数据包进行变换,使其更难通过 DPI 识别。
实现说明
- 无 FEC:与 xtaci 的原始 KCP 实现不同,Xray 的 mKCP 不实现前向纠错(FEC)。可靠性完全依靠重传。
- 会话 ID:一个 16 位值,每次连接全局递增(
dialer.go:18)。这将并发出站连接限制在约 65K 个后循环。 - 30 秒空闲超时:30 秒内无传入数据的连接将被关闭(
connection.go:613-615)。 - 协程模型:每个连接有 2 个 updater 协程(data + ping),加上一个
fetchInput协程用于读取。updater 通过信号量懒启动(WakeUp)。 - 缓冲区所有权:DataSegment 拥有其负载缓冲区。分段处理后必须调用
Release()将缓冲区返回池中。 - Writer 会话跟踪:监听器的
Writer结构体(listener.go:156-170)跟踪ConnectionID,以便在关闭时从监听器中移除会话。 - TLS over KCP:可选的 TLS 包装运行在 KCP 可靠流之上,而非原始 UDP 之上。这意味着 TLS 看到的是有序字节流。
- 不支持 REALITY:mKCP 不支持 REALITY(仅支持标准 TLS),因为 REALITY 需要基于 TCP 传输的 TLS 1.3 ClientHello 操作。