تعدد إرسال Mux
يسمح Mux (تعدد الإرسال) لتدفقات منطقية متعددة بمشاركة اتصال نقل واحد. هذا يُقلّل من زمن إنشاء الاتصال والحمل الزائد، خاصة عبر وسائل النقل ذات زمن الاستجابة العالي.
المصدر: common/mux/
البنية المعمارية
flowchart LR
subgraph Client
S1[Stream 1]
S2[Stream 2]
S3[Stream 3]
CM[ClientManager]
CW[ClientWorker]
end
subgraph Transport["Single Transport Connection"]
Frames["Mux Frames<br/>(interleaved)"]
end
subgraph Server
SW[ServerWorker]
D1[Dispatch 1]
D2[Dispatch 2]
D3[Dispatch 3]
end
S1 --> CM
S2 --> CM
S3 --> CM
CM --> CW
CW --> Frames
Frames --> SW
SW --> D1
SW --> D2
SW --> D3تنسيق الإطار
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
| Meta Len | Session ID | Status | Option |
| (2B BE) | (2B BE) | (1B) | (1B) |
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
For SessionStatusNew:
+--+--+--+--+--+--+--+--+
| Network | Port | Addr |
| (1B) | (2B) | (var)|
+--+--+--+--+--+--+--+--+
Optional (XUDP): [8B GlobalID]
Data payload:
+--+--+--+--+
| Data Len | (2B BE, followed by payload bytes)
+--+--+--+--+طول البيانات الوصفية
أول 2 بايت تُشفّر طول البيانات الوصفية التالية (معرف الجلسة + الحالة + الخيار + الهدف).
معرف الجلسة
معرف جلسة بطول 2 بايت. كل تدفق منطقي يحصل على معرف فريد داخل الاتصال. المعرف=0 مع شبكة=UDP يُستخدم للكشف عن XUDP.
حالة الجلسة
| القيمة | الاسم | الوصف |
|---|---|---|
0x01 | New | جلسة جديدة (تتضمن عنوان الهدف) |
0x02 | Keep | متابعة جلسة موجودة (البيانات تتبع) |
0x03 | End | الجلسة مُغلقة |
0x04 | KeepAlive | إبقاء الاتصال حيًا (بدون بيانات) |
أعلام الخيار
| البت | الاسم | الوصف |
|---|---|---|
0x01 | OptionData | الإطار يحتوي على حمولة بيانات |
0x02 | OptionError | الجلسة انتهت بخطأ |
شبكة الهدف
| القيمة | الشبكة |
|---|---|
0x01 | TCP |
0x02 | UDP |
تنسيق العنوان
Port: 2 bytes big-endian
AddrType: 0x01=IPv4, 0x02=Domain, 0x03=IPv6
Address: 4B (IPv4), 1B+NB (domain len+domain), 16B (IPv6)جانب العميل
ClientManager
نقطة الدخول لـ Mux على جانب المخرج:
type ClientManager struct {
Enabled bool
Picker WorkerPicker
}
func (m *ClientManager) Dispatch(ctx, link) error {
for i := 0; i < 16; i++ {
worker, _ := m.Picker.PickAvailable()
if worker.Dispatch(ctx, link) {
return nil
}
}
return errors.New("unable to find an available mux client")
}ClientWorker
كل ClientWorker يُدير اتصال نقل واحد مع جلسات متعددة:
type ClientWorker struct {
sessionManager *SessionManager // tracks active sessions
link transport.Link // transport connection
done *done.Instance
}يكون العامل "ممتلئًا" عندما يصل إلى الحد الأقصى للجلسات المتزامنة (قابل للتهيئة، الافتراضي ~128).
IncrementalWorkerPicker
يُنشئ عمالاً جددًا عند الحاجة:
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
p.access.Lock()
defer p.access.Unlock()
idx := p.findAvailable()
if idx >= 0 {
return p.workers[idx], nil
}
// All workers full: create a new one
worker, err := p.Factory.Create()
p.workers = append(p.workers, worker)
return worker, nil
}ClientWorker.Dispatch
func (m *ClientWorker) Dispatch(ctx, link) bool {
if m.IsFull() || m.Closed() {
return false
}
// Allocate session
s := m.sessionManager.Allocate()
// Build frame metadata
meta := FrameMetadata{
SessionID: s.ID,
SessionStatus: SessionStatusNew,
Target: outbound.Target,
Option: OptionData,
}
// Write frame to transport
// Copy data from link.Reader to transport (with framing)
// Copy data from transport to link.Writer (with deframing)
}جانب الخادم
ServerWorker
يفك تعدد الإرسال للإطارات الواردة ويوزع كل جلسة:
type ServerWorker struct {
dispatcher routing.Dispatcher
link *transport.Link
sessionManager *SessionManager
}
func NewServerWorker(ctx, dispatcher, link) (*ServerWorker, error) {
// Start worker goroutine
go worker.run(ctx)
return worker, nil
}حلقة معالجة الإطارات
func (w *ServerWorker) run(ctx) {
for {
// Read frame metadata
meta := new(FrameMetadata)
meta.Unmarshal(reader)
switch meta.SessionStatus {
case SessionStatusNew:
// Create new session
// Dispatch via routing
s := w.sessionManager.Add(meta.SessionID)
link := dispatcher.Dispatch(ctx, meta.Target)
// Start copying between session and link
case SessionStatusKeep:
// Write data to existing session
s := w.sessionManager.Get(meta.SessionID)
// Read data payload and write to session
case SessionStatusEnd:
// Close session
w.sessionManager.Remove(meta.SessionID)
case SessionStatusKeepAlive:
// No-op, just prevents connection timeout
}
}
}مدير الجلسات
type SessionManager struct {
sessions map[uint16]*Session
count int
closed bool
}
type Session struct {
input buf.Reader // data from transport
output buf.Writer // data to transport
parent *SessionManager
ID uint16
transferType protocol.TransferType
}تأطير البيانات
عند كتابة بيانات لجلسة موجودة:
// Writer wraps each write in a mux frame:
frame = [2B meta_len][2B session_id][status=Keep][option=Data]
data = [2B data_len][payload]عند القراءة، يُحلّل القارئ الإطارات ويوجّه البيانات إلى الجلسة الصحيحة.
إعداد Mux
{
"outbounds": [{
"mux": {
"enabled": true,
"concurrency": 8, // max streams per connection
"xudpConcurrency": 16,
"xudpProxyUDP443": "reject"
}
}]
}ملاحظات التنفيذ
مساحة معرف الجلسة: 16-بت، لذا الحد الأقصى 65535 جلسة لكل اتصال. عمليًا، يُحدّد التزامن بحوالي ~128 للأداء.
الحمل الزائد للتأطير: كل إطار بيانات يُضيف 6-8 بايت من البيانات الوصفية. لحزم UDP الصغيرة، هذا الحمل الزائد كبير.
حجب رأس الصف: جميع الجلسات تتشارك اتصال TCP واحد. إذا توقفت جلسة واحدة (مثلاً، إعادة إرسال)، تتأثر جميع الجلسات. هذا هو المقايضة الأساسية لـ Mux.
KeepAlive: إطار
SessionStatusKeepAliveيمنع إغلاق اتصال النقل بسبب مهلة الخمول. يُرسل دوريًا عندما لا تتدفق بيانات.دورة حياة العامل: يُنشأ العمال عند الحاجة ويُنظّفون دوريًا (كل 30 ثانية). يُغلق العامل عندما تنتهي جميع جلساته ولا تصل جلسات جديدة.
كشف XUDP: معرف الجلسة 0 + شبكة UDP يشير إلى وضع XUDP. انظر بروتوكول XUDP للتفاصيل.