mKCP Transport
Introduction
mKCP (modified KCP) is a UDP-based reliable transport inspired by the KCP protocol (by skywind3000). It provides reliable, ordered delivery over UDP with configurable congestion control, trading higher bandwidth usage for lower latency. mKCP is useful in environments where TCP performs poorly (high packet loss, high latency) but UDP is available. It supports optional TLS wrapping and packet header obfuscation.
Protocol Registration
Registered as "mkcp" (transport/internet/kcp/kcp.go:9):
const protocolName = "mkcp"- Dialer:
kcp/dialer.go:95-97 - Listener:
kcp/listener.go:176-178 - Config:
kcp/config.go:80-84
Architecture Overview
flowchart TD
subgraph "Application Layer"
APP[Application Read/Write]
end
subgraph "KCP Connection"
RW[ReceivingWorker / SendingWorker]
RTT[RoundTripInfo]
UPD[Updater - periodic flush]
end
subgraph "Segment Layer"
DS[DataSegment]
AS[AckSegment]
CS[CmdOnlySegment]
end
subgraph "Output"
SW[SegmentWriter]
RSW[RetryableWriter]
end
subgraph "Network"
UDP[UDP Socket / PacketConn]
end
APP --> RW
RW --> UPD
UPD -->|flush| RW
RW --> DS & AS & CS
DS & AS & CS --> SW --> RSW --> UDP
UDP --> |fetchInput| RW
RTT -->|timeout/rto| RWDial Flow
DialKCP (kcp/dialer.go:48-93) creates a KCP connection over a UDP socket:
func DialKCP(ctx context.Context, dest net.Destination,
streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
dest.Network = net.Network_UDP // Force UDP
rawConn, _ := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
// Optional UDP mask
if streamSettings.UdpmaskManager != nil {
wrapper := rawConn.(*internet.PacketConnWrapper)
wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}
kcpSettings := streamSettings.ProtocolSettings.(*Config)
conv := uint16(atomic.AddUint32(&globalConv, 1))
session := NewConnection(ConnMetadata{
LocalAddr: rawConn.LocalAddr(),
RemoteAddr: rawConn.RemoteAddr(),
Conversation: conv,
}, rawConn, rawConn, kcpSettings)
go fetchInput(ctx, rawConn, reader, session)
// Optional TLS
if config := tls.ConfigFromStreamSettings(streamSettings); config != nil {
return tls.Client(session, config.GetTLSConfig(tls.WithDestination(dest))), nil
}
return session, nil
}The fetchInput goroutine (kcp/dialer.go:20-45) continuously reads from the UDP socket and feeds segments to the KCP connection:
func fetchInput(_ context.Context, input io.Reader, reader PacketReader, conn *Connection) {
cache := make(chan *buf.Buffer, 1024)
go func() {
for {
payload := buf.New()
payload.ReadFrom(input)
cache <- payload
}
}()
for payload := range cache {
segments := reader.Read(payload.Bytes())
conn.Input(segments)
}
}Listen Flow
NewListener (kcp/listener.go:35-61) creates a UDP hub and dispatches incoming packets to KCP sessions:
func NewListener(ctx context.Context, address net.Address, port net.Port,
streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (*Listener, error) {
hub, _ := udp.ListenUDP(ctx, address, port, streamSettings, udp.HubCapacity(1024))
go l.handlePackets()
return l, nil
}Session Demultiplexing
OnReceive (kcp/listener.go:70-122) demultiplexes incoming UDP packets into KCP sessions by {Remote Address, Port, Conversation ID}:
func (l *Listener) OnReceive(payload *buf.Buffer, src net.Destination) {
segments := l.reader.Read(payload.Bytes())
conv := segments[0].Conversation()
id := ConnectionID{Remote: src.Address, Port: src.Port, Conv: conv}
conn, found := l.sessions[id]
if !found {
// Create new KCP connection
conn = NewConnection(ConnMetadata{...}, writer, writer, l.config)
l.addConn(netConn) // with optional TLS wrapping
l.sessions[id] = conn
}
conn.Input(segments)
}Segment Wire Format
All segments share a 4-byte header (kcp/segment.go:276-303):
+--+--+--+--+
| Conv |Cmd|Opt|
+--+--+--+--+- Conv (2 bytes, big-endian): Conversation ID
- Cmd (1 byte): Command type
- Opt (1 byte): Segment options (bit 0 = Close)
DataSegment (Cmd=1)
Offset Size Field
0 2 Conv
2 1 Cmd (0x01)
3 1 Option
4 4 Timestamp
8 4 Number (sequence number)
12 4 SendingNext
16 2 DataLen
18 N PayloadTotal overhead: 18 bytes per data segment (DataSegmentOverhead = 18).
AckSegment (Cmd=0)
Offset Size Field
0 2 Conv
2 1 Cmd (0x00)
3 1 Option
4 4 ReceivingWindow
8 4 ReceivingNext
12 4 Timestamp
16 1 Count
17 4*N NumberList (acknowledged sequence numbers)Up to 128 ACK numbers per segment (ackNumberLimit = 128).
CmdOnlySegment (Cmd=2,3)
Offset Size Field
0 2 Conv
2 1 Cmd (0x02=Terminate, 0x03=Ping)
3 1 Option
4 4 SendingNext
8 4 ReceivingNext
12 4 PeerRTOFixed 16 bytes. Used for keep-alive pings and connection termination.
KCP Connection
State Machine
The Connection (kcp/connection.go:179-204) maintains a state machine with 6 states:
stateDiagram-v2
[*] --> Active
Active --> ReadyToClose: Close()
Active --> PeerClosed: peer sends Close
Active --> PeerTerminating: peer sends Terminate
ReadyToClose --> Terminating: send buffer empty OR peer Close/Terminate
PeerClosed --> Terminating: Close()
PeerTerminating --> Terminating: timeout (4s)
PeerTerminating --> Terminated: Close()
Terminating --> Terminated: timeout (8s)
ReadyToClose --> Terminating: timeout (15s)
Terminated --> [*]: Terminate()Updater System
Two Updater goroutines drive the connection (kcp/connection.go:123-170):
- dataUpdater: Runs at TTI interval when send/receive work is needed. Flushes ACKs and retransmits data.
- pingUpdater: Runs every 5 seconds. Sends keep-alive pings and handles timeouts.
Both use a semaphore-based wake-up mechanism to avoid unnecessary goroutines (connection.go:142-148):
func (u *Updater) WakeUp() {
select {
case <-u.notifier.Wait():
go u.run()
default:
}
}Round Trip Time Tracking
RoundTripInfo (kcp/connection.go:52-121) implements RFC 6298 RTT estimation:
func (info *RoundTripInfo) Update(rtt uint32, current uint32) {
// SRTT = 7/8 * SRTT + 1/8 * RTT
// RTTVAR = 3/4 * RTTVAR + 1/4 * |SRTT - RTT|
// RTO = SRTT + max(4*RTTVAR, minRTT)
// RTO capped at 10 seconds, then multiplied by 5/4
}Input Processing
Connection.Input (kcp/connection.go:560-605) handles incoming segments by type:
func (c *Connection) Input(segments []Segment) {
for _, seg := range segments {
switch seg := seg.(type) {
case *DataSegment:
c.receivingWorker.ProcessSegment(seg)
c.dataInput.Signal() // wake up Read()
case *AckSegment:
c.sendingWorker.ProcessSegment(current, seg, c.roundTrip.Timeout())
c.dataOutput.Signal() // wake up Write()
case *CmdOnlySegment:
// Handle Terminate, update peer RTO
}
}
}Flush Cycle
flush (kcp/connection.go:607-644) is called periodically by the updaters:
- Close idle connections (30s timeout) (
connection.go:613-615) - Transition
ReadyToClosewhen send buffer is empty (connection.go:616-618) - Send
CommandTerminateinTerminatingstate (connection.go:620-628) - Flush ACK list (
connection.go:638) - Flush sending window (retransmit/send new) (
connection.go:639) - Send keep-alive ping every 3 seconds (
connection.go:641-643)
Configuration
From kcp/config.go:
| Parameter | Default | Description |
|---|---|---|
| MTU | 1350 | Maximum Transmission Unit (bytes) |
| TTI | 50 | Transmission Time Interval (ms) |
| UplinkCapacity | 5 | Upload bandwidth (MB/s) |
| DownlinkCapacity | 20 | Download bandwidth (MB/s) |
| WriteBuffer | 2 MB | Write buffer size |
Derived values:
- MSS (Maximum Segment Size):
MTU - DataSegmentOverhead= 1350 - 18 = 1332 bytes - SendingInFlightSize:
UplinkCapacity * 1024 * 1024 / MTU / (1000 / TTI), minimum 8 - SendingBufferSize:
WriteBuffer / MTU - ReceivingInFlightSize:
DownlinkCapacity * 1024 * 1024 / MTU / (1000 / TTI), minimum 8
Output Layer
SegmentWriter (kcp/output.go:12-13) serializes segments and writes to the network:
type SegmentWriter interface {
Write(seg Segment) error
}SimpleSegmentWriter (output.go:15-37) serializes into a buffer and writes:
func (w *SimpleSegmentWriter) Write(seg Segment) error {
w.buffer.Clear()
rawBytes := w.buffer.Extend(seg.ByteSize())
seg.Serialize(rawBytes)
_, err := w.writer.Write(w.buffer.Bytes())
return err
}RetryableWriter (output.go:39-53) wraps writes with up to 5 retries at 100ms intervals.
Packet Reading
KCPPacketReader (kcp/io.go:7-20) parses a byte buffer into multiple segments:
func (r *KCPPacketReader) Read(b []byte) []Segment {
var result []Segment
for len(b) > 0 {
seg, x := ReadSegment(b)
if seg == nil { break }
result = append(result, seg)
b = x
}
return result
}ReadSegment (kcp/segment.go:276-303) parses the 4-byte header to determine segment type and delegates to the appropriate parse method.
UDP Mask Support
mKCP supports packet obfuscation via UdpmaskManager (kcp/dialer.go:57-71):
if streamSettings.UdpmaskManager != nil {
wrapper := rawConn.(*internet.PacketConnWrapper)
wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}This wraps the PacketConn to transform UDP packets, making them harder to identify via DPI.
Implementation Notes
- No FEC: Unlike the original KCP implementation by xtaci, Xray's mKCP does not implement Forward Error Correction (FEC). Reliability comes purely from retransmission.
- Conversation ID: A 16-bit value globally incremented per connection (
dialer.go:18). This limits concurrent outbound connections to ~65K before wrapping. - 30-second idle timeout: Connections with no incoming data for 30 seconds are closed (
connection.go:613-615). - Goroutine model: Each connection has 2 updater goroutines (data + ping), plus a
fetchInputgoroutine for reading. The updaters are lazy-started via semaphore (WakeUp). - Buffer ownership: DataSegments own their payload buffers. Segments must be
Release()d after processing to return buffers to the pool. - Writer session tracking: The listener's
Writerstruct (listener.go:156-170) tracks theConnectionIDso it can remove the session from the listener when closed. - TLS over KCP: Optional TLS wrapping runs over the KCP reliable stream, not over raw UDP. This means TLS sees an ordered byte stream.
- No REALITY: mKCP does not support REALITY (only standard TLS), since REALITY requires TLS 1.3 ClientHello manipulation that assumes TCP transport.