نقل mKCP
مقدمة
mKCP (KCP المعدّل) هو نقل موثوق قائم على UDP مستوحى من بروتوكول KCP (بواسطة skywind3000). يوفر تسليمًا موثوقًا ومرتبًا عبر UDP مع تحكم قابل للتكوين في الازدحام، مع مقايضة استخدام نطاق ترددي أعلى مقابل زمن وصول أقل. mKCP مفيد في البيئات التي يكون فيها أداء TCP ضعيفًا (فقدان حزم عالي، زمن وصول عالي) لكن UDP متاح. يدعم تغليف TLS اختياري وتشويش رؤوس الحزم.
تسجيل البروتوكول
مسجل باسم "mkcp" (transport/internet/kcp/kcp.go:9):
const protocolName = "mkcp"- المتصل:
kcp/dialer.go:95-97 - المستمع:
kcp/listener.go:176-178 - الإعدادات:
kcp/config.go:80-84
نظرة عامة على البنية
flowchart TD
subgraph "Application Layer"
APP[Application Read/Write]
end
subgraph "KCP Connection"
RW[ReceivingWorker / SendingWorker]
RTT[RoundTripInfo]
UPD[Updater - periodic flush]
end
subgraph "Segment Layer"
DS[DataSegment]
AS[AckSegment]
CS[CmdOnlySegment]
end
subgraph "Output"
SW[SegmentWriter]
RSW[RetryableWriter]
end
subgraph "Network"
UDP[UDP Socket / PacketConn]
end
APP --> RW
RW --> UPD
UPD -->|flush| RW
RW --> DS & AS & CS
DS & AS & CS --> SW --> RSW --> UDP
UDP --> |fetchInput| RW
RTT -->|timeout/rto| RWتدفق الاتصال
DialKCP (kcp/dialer.go:48-93) ينشئ اتصال KCP عبر مقبس UDP:
func DialKCP(ctx context.Context, dest net.Destination,
streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
dest.Network = net.Network_UDP // Force UDP
rawConn, _ := internet.DialSystem(ctx, dest, streamSettings.SocketSettings)
// Optional UDP mask
if streamSettings.UdpmaskManager != nil {
wrapper := rawConn.(*internet.PacketConnWrapper)
wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}
kcpSettings := streamSettings.ProtocolSettings.(*Config)
conv := uint16(atomic.AddUint32(&globalConv, 1))
session := NewConnection(ConnMetadata{
LocalAddr: rawConn.LocalAddr(),
RemoteAddr: rawConn.RemoteAddr(),
Conversation: conv,
}, rawConn, rawConn, kcpSettings)
go fetchInput(ctx, rawConn, reader, session)
// Optional TLS
if config := tls.ConfigFromStreamSettings(streamSettings); config != nil {
return tls.Client(session, config.GetTLSConfig(tls.WithDestination(dest))), nil
}
return session, nil
}الروتين المتزامن fetchInput (kcp/dialer.go:20-45) يقرأ باستمرار من مقبس UDP ويغذي المقاطع إلى اتصال KCP:
func fetchInput(_ context.Context, input io.Reader, reader PacketReader, conn *Connection) {
cache := make(chan *buf.Buffer, 1024)
go func() {
for {
payload := buf.New()
payload.ReadFrom(input)
cache <- payload
}
}()
for payload := range cache {
segments := reader.Read(payload.Bytes())
conn.Input(segments)
}
}تدفق الاستماع
NewListener (kcp/listener.go:35-61) ينشئ محور UDP ويوزع الحزم الواردة على جلسات KCP:
func NewListener(ctx context.Context, address net.Address, port net.Port,
streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (*Listener, error) {
hub, _ := udp.ListenUDP(ctx, address, port, streamSettings, udp.HubCapacity(1024))
go l.handlePackets()
return l, nil
}فك تعدد إرسال الجلسات
OnReceive (kcp/listener.go:70-122) يفك تعدد إرسال حزم UDP الواردة إلى جلسات KCP بواسطة {العنوان البعيد، المنفذ، معرف المحادثة}:
func (l *Listener) OnReceive(payload *buf.Buffer, src net.Destination) {
segments := l.reader.Read(payload.Bytes())
conv := segments[0].Conversation()
id := ConnectionID{Remote: src.Address, Port: src.Port, Conv: conv}
conn, found := l.sessions[id]
if !found {
// Create new KCP connection
conn = NewConnection(ConnMetadata{...}, writer, writer, l.config)
l.addConn(netConn) // with optional TLS wrapping
l.sessions[id] = conn
}
conn.Input(segments)
}تنسيق مقطع السلك
جميع المقاطع تتشارك رأسًا من 4 بايت (kcp/segment.go:276-303):
+--+--+--+--+
| Conv |Cmd|Opt|
+--+--+--+--+- Conv (2 بايت، ترتيب كبير): معرف المحادثة
- Cmd (1 بايت): نوع الأمر
- Opt (1 بايت): خيارات المقطع (البت 0 = إغلاق)
DataSegment (Cmd=1)
Offset Size Field
0 2 Conv
2 1 Cmd (0x01)
3 1 Option
4 4 Timestamp
8 4 Number (sequence number)
12 4 SendingNext
16 2 DataLen
18 N Payloadإجمالي الحمل: 18 بايت لكل مقطع بيانات (DataSegmentOverhead = 18).
AckSegment (Cmd=0)
Offset Size Field
0 2 Conv
2 1 Cmd (0x00)
3 1 Option
4 4 ReceivingWindow
8 4 ReceivingNext
12 4 Timestamp
16 1 Count
17 4*N NumberList (acknowledged sequence numbers)حتى 128 رقم ACK لكل مقطع (ackNumberLimit = 128).
CmdOnlySegment (Cmd=2,3)
Offset Size Field
0 2 Conv
2 1 Cmd (0x02=Terminate, 0x03=Ping)
3 1 Option
4 4 SendingNext
8 4 ReceivingNext
12 4 PeerRTOحجم ثابت 16 بايت. يُستخدم لنبضات البقاء وإنهاء الاتصال.
اتصال KCP
آلة الحالة
Connection (kcp/connection.go:179-204) يحتفظ بآلة حالة مع 6 حالات:
stateDiagram-v2
[*] --> Active
Active --> ReadyToClose: Close()
Active --> PeerClosed: peer sends Close
Active --> PeerTerminating: peer sends Terminate
ReadyToClose --> Terminating: send buffer empty OR peer Close/Terminate
PeerClosed --> Terminating: Close()
PeerTerminating --> Terminating: timeout (4s)
PeerTerminating --> Terminated: Close()
Terminating --> Terminated: timeout (8s)
ReadyToClose --> Terminating: timeout (15s)
Terminated --> [*]: Terminate()نظام المُحدّث
روتينان متزامنان Updater يقودان الاتصال (kcp/connection.go:123-170):
- dataUpdater: يعمل بفاصل TTI عند الحاجة لعمل إرسال/استقبال. يفرغ ACK ويعيد إرسال البيانات.
- pingUpdater: يعمل كل 5 ثوانٍ. يرسل نبضات البقاء ويعالج انتهاء المهلة.
كلاهما يستخدم آلية إيقاظ قائمة على الإشارة لتجنب الروتينات المتزامنة غير الضرورية (connection.go:142-148):
func (u *Updater) WakeUp() {
select {
case <-u.notifier.Wait():
go u.run()
default:
}
}تتبع زمن الرحلة الذهاب والعودة
RoundTripInfo (kcp/connection.go:52-121) ينفذ تقدير RTT وفقًا لـ RFC 6298:
func (info *RoundTripInfo) Update(rtt uint32, current uint32) {
// SRTT = 7/8 * SRTT + 1/8 * RTT
// RTTVAR = 3/4 * RTTVAR + 1/4 * |SRTT - RTT|
// RTO = SRTT + max(4*RTTVAR, minRTT)
// RTO capped at 10 seconds, then multiplied by 5/4
}معالجة المدخلات
Connection.Input (kcp/connection.go:560-605) يعالج المقاطع الواردة حسب النوع:
func (c *Connection) Input(segments []Segment) {
for _, seg := range segments {
switch seg := seg.(type) {
case *DataSegment:
c.receivingWorker.ProcessSegment(seg)
c.dataInput.Signal() // wake up Read()
case *AckSegment:
c.sendingWorker.ProcessSegment(current, seg, c.roundTrip.Timeout())
c.dataOutput.Signal() // wake up Write()
case *CmdOnlySegment:
// Handle Terminate, update peer RTO
}
}
}دورة التفريغ
flush (kcp/connection.go:607-644) يُستدعى دوريًا بواسطة المُحدّثين:
- إغلاق الاتصالات الخاملة (مهلة 30 ثانية) (
connection.go:613-615) - الانتقال إلى
ReadyToCloseعندما يكون مخزن الإرسال فارغًا (connection.go:616-618) - إرسال
CommandTerminateفي حالةTerminating(connection.go:620-628) - تفريغ قائمة ACK (
connection.go:638) - تفريغ نافذة الإرسال (إعادة إرسال/إرسال جديد) (
connection.go:639) - إرسال نبضة بقاء كل 3 ثوانٍ (
connection.go:641-643)
الإعدادات
من kcp/config.go:
| المعامل | الافتراضي | الوصف |
|---|---|---|
| MTU | 1350 | وحدة الإرسال القصوى (بايت) |
| TTI | 50 | فاصل وقت الإرسال (مللي ثانية) |
| UplinkCapacity | 5 | نطاق عرض الرفع (ميجابايت/ثانية) |
| DownlinkCapacity | 20 | نطاق عرض التنزيل (ميجابايت/ثانية) |
| WriteBuffer | 2 ميجابايت | حجم مخزن الكتابة المؤقت |
القيم المشتقة:
- MSS (الحد الأقصى لحجم المقطع):
MTU - DataSegmentOverhead= 1350 - 18 = 1332 بايت - SendingInFlightSize:
UplinkCapacity * 1024 * 1024 / MTU / (1000 / TTI)، الحد الأدنى 8 - SendingBufferSize:
WriteBuffer / MTU - ReceivingInFlightSize:
DownlinkCapacity * 1024 * 1024 / MTU / (1000 / TTI)، الحد الأدنى 8
طبقة الإخراج
SegmentWriter (kcp/output.go:12-13) يُسلسل المقاطع ويكتب إلى الشبكة:
type SegmentWriter interface {
Write(seg Segment) error
}SimpleSegmentWriter (output.go:15-37) يُسلسل إلى مخزن مؤقت ويكتب:
func (w *SimpleSegmentWriter) Write(seg Segment) error {
w.buffer.Clear()
rawBytes := w.buffer.Extend(seg.ByteSize())
seg.Serialize(rawBytes)
_, err := w.writer.Write(w.buffer.Bytes())
return err
}RetryableWriter (output.go:39-53) يغلف الكتابات مع ما يصل إلى 5 محاولات إعادة بفواصل 100 مللي ثانية.
قراءة الحزم
KCPPacketReader (kcp/io.go:7-20) يحلل مخزن البايت إلى مقاطع متعددة:
func (r *KCPPacketReader) Read(b []byte) []Segment {
var result []Segment
for len(b) > 0 {
seg, x := ReadSegment(b)
if seg == nil { break }
result = append(result, seg)
b = x
}
return result
}ReadSegment (kcp/segment.go:276-303) يحلل الرأس ذا 4 بايت لتحديد نوع المقطع ويفوض إلى طريقة parse المناسبة.
دعم قناع UDP
يدعم mKCP تشويش الحزم عبر UdpmaskManager (kcp/dialer.go:57-71):
if streamSettings.UdpmaskManager != nil {
wrapper := rawConn.(*internet.PacketConnWrapper)
wrapper.Conn, _ = streamSettings.UdpmaskManager.WrapPacketConnClient(raw)
}هذا يغلف PacketConn لتحويل حزم UDP، مما يجعلها أصعب في التعرف عليها عبر فحص الحزم العميق (DPI).
ملاحظات التنفيذ
- بدون FEC: على عكس تطبيق KCP الأصلي بواسطة xtaci، لا ينفذ mKCP في Xray تصحيح الأخطاء الأمامي (FEC). الموثوقية تأتي فقط من إعادة الإرسال.
- معرف المحادثة: قيمة 16 بت يتم زيادتها عالميًا لكل اتصال (
dialer.go:18). هذا يحد الاتصالات الصادرة المتزامنة إلى ~65 ألف قبل الالتفاف. - مهلة خمول 30 ثانية: الاتصالات التي لا تحتوي على بيانات واردة لمدة 30 ثانية يتم إغلاقها (
connection.go:613-615). - نموذج الروتينات المتزامنة: كل اتصال لديه روتينان مُحدّثان (بيانات + نبضة)، بالإضافة إلى روتين
fetchInputللقراءة. المُحدّثات تبدأ بشكل كسول عبر إشارة (WakeUp). - ملكية المخازن المؤقتة: DataSegments تمتلك مخازنها المؤقتة للحمولة. يجب استدعاء
Release()على المقاطع بعد المعالجة لإعادة المخازن المؤقتة إلى التجمع. - تتبع جلسة الكاتب: بنية
Writerفي المستمع (listener.go:156-170) تتتبعConnectionIDحتى تتمكن من إزالة الجلسة من المستمع عند الإغلاق. - TLS عبر KCP: تغليف TLS الاختياري يعمل فوق تدفق KCP الموثوق، وليس فوق UDP الخام. هذا يعني أن TLS يرى تدفق بايت مرتب.
- بدون REALITY: لا يدعم mKCP بروتوكول REALITY (فقط TLS القياسي)، لأن REALITY يتطلب التلاعب بـ ClientHello في TLS 1.3 الذي يفترض نقل TCP.