Skip to content

تعدد إرسال Mux

يسمح Mux (تعدد الإرسال) لتدفقات منطقية متعددة بمشاركة اتصال نقل واحد. هذا يُقلّل من زمن إنشاء الاتصال والحمل الزائد، خاصة عبر وسائل النقل ذات زمن الاستجابة العالي.

المصدر: common/mux/

البنية المعمارية

mermaid
flowchart LR
    subgraph Client
        S1[Stream 1]
        S2[Stream 2]
        S3[Stream 3]
        CM[ClientManager]
        CW[ClientWorker]
    end

    subgraph Transport["Single Transport Connection"]
        Frames["Mux Frames<br/>(interleaved)"]
    end

    subgraph Server
        SW[ServerWorker]
        D1[Dispatch 1]
        D2[Dispatch 2]
        D3[Dispatch 3]
    end

    S1 --> CM
    S2 --> CM
    S3 --> CM
    CM --> CW
    CW --> Frames
    Frames --> SW
    SW --> D1
    SW --> D2
    SW --> D3

تنسيق الإطار

+--+--+--+--+--+--+--+--+--+--+--+--+--+--+
| Meta Len  | Session ID | Status | Option |
| (2B BE)   | (2B BE)    | (1B)   | (1B)   |
+--+--+--+--+--+--+--+--+--+--+--+--+--+--+

For SessionStatusNew:
+--+--+--+--+--+--+--+--+
| Network | Port  | Addr |
| (1B)    | (2B)  | (var)|
+--+--+--+--+--+--+--+--+

Optional (XUDP): [8B GlobalID]

Data payload:
+--+--+--+--+
| Data Len  |  (2B BE, followed by payload bytes)
+--+--+--+--+

طول البيانات الوصفية

أول 2 بايت تُشفّر طول البيانات الوصفية التالية (معرف الجلسة + الحالة + الخيار + الهدف).

معرف الجلسة

معرف جلسة بطول 2 بايت. كل تدفق منطقي يحصل على معرف فريد داخل الاتصال. المعرف=0 مع شبكة=UDP يُستخدم للكشف عن XUDP.

حالة الجلسة

القيمةالاسمالوصف
0x01Newجلسة جديدة (تتضمن عنوان الهدف)
0x02Keepمتابعة جلسة موجودة (البيانات تتبع)
0x03Endالجلسة مُغلقة
0x04KeepAliveإبقاء الاتصال حيًا (بدون بيانات)

أعلام الخيار

البتالاسمالوصف
0x01OptionDataالإطار يحتوي على حمولة بيانات
0x02OptionErrorالجلسة انتهت بخطأ

شبكة الهدف

القيمةالشبكة
0x01TCP
0x02UDP

تنسيق العنوان

Port:     2 bytes big-endian
AddrType: 0x01=IPv4, 0x02=Domain, 0x03=IPv6
Address:  4B (IPv4), 1B+NB (domain len+domain), 16B (IPv6)

جانب العميل

ClientManager

نقطة الدخول لـ Mux على جانب المخرج:

go
type ClientManager struct {
    Enabled bool
    Picker  WorkerPicker
}

func (m *ClientManager) Dispatch(ctx, link) error {
    for i := 0; i < 16; i++ {
        worker, _ := m.Picker.PickAvailable()
        if worker.Dispatch(ctx, link) {
            return nil
        }
    }
    return errors.New("unable to find an available mux client")
}

ClientWorker

كل ClientWorker يُدير اتصال نقل واحد مع جلسات متعددة:

go
type ClientWorker struct {
    sessionManager *SessionManager  // tracks active sessions
    link           transport.Link   // transport connection
    done           *done.Instance
}

يكون العامل "ممتلئًا" عندما يصل إلى الحد الأقصى للجلسات المتزامنة (قابل للتهيئة، الافتراضي ~128).

IncrementalWorkerPicker

يُنشئ عمالاً جددًا عند الحاجة:

go
func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
    p.access.Lock()
    defer p.access.Unlock()

    idx := p.findAvailable()
    if idx >= 0 {
        return p.workers[idx], nil
    }

    // All workers full: create a new one
    worker, err := p.Factory.Create()
    p.workers = append(p.workers, worker)
    return worker, nil
}

ClientWorker.Dispatch

go
func (m *ClientWorker) Dispatch(ctx, link) bool {
    if m.IsFull() || m.Closed() {
        return false
    }

    // Allocate session
    s := m.sessionManager.Allocate()

    // Build frame metadata
    meta := FrameMetadata{
        SessionID:     s.ID,
        SessionStatus: SessionStatusNew,
        Target:        outbound.Target,
        Option:        OptionData,
    }

    // Write frame to transport
    // Copy data from link.Reader to transport (with framing)
    // Copy data from transport to link.Writer (with deframing)
}

جانب الخادم

ServerWorker

يفك تعدد الإرسال للإطارات الواردة ويوزع كل جلسة:

go
type ServerWorker struct {
    dispatcher     routing.Dispatcher
    link           *transport.Link
    sessionManager *SessionManager
}

func NewServerWorker(ctx, dispatcher, link) (*ServerWorker, error) {
    // Start worker goroutine
    go worker.run(ctx)
    return worker, nil
}

حلقة معالجة الإطارات

go
func (w *ServerWorker) run(ctx) {
    for {
        // Read frame metadata
        meta := new(FrameMetadata)
        meta.Unmarshal(reader)

        switch meta.SessionStatus {
        case SessionStatusNew:
            // Create new session
            // Dispatch via routing
            s := w.sessionManager.Add(meta.SessionID)
            link := dispatcher.Dispatch(ctx, meta.Target)
            // Start copying between session and link

        case SessionStatusKeep:
            // Write data to existing session
            s := w.sessionManager.Get(meta.SessionID)
            // Read data payload and write to session

        case SessionStatusEnd:
            // Close session
            w.sessionManager.Remove(meta.SessionID)

        case SessionStatusKeepAlive:
            // No-op, just prevents connection timeout
        }
    }
}

مدير الجلسات

go
type SessionManager struct {
    sessions map[uint16]*Session
    count    int
    closed   bool
}

type Session struct {
    input        buf.Reader    // data from transport
    output       buf.Writer    // data to transport
    parent       *SessionManager
    ID           uint16
    transferType protocol.TransferType
}

تأطير البيانات

عند كتابة بيانات لجلسة موجودة:

go
// Writer wraps each write in a mux frame:
frame = [2B meta_len][2B session_id][status=Keep][option=Data]
data  = [2B data_len][payload]

عند القراءة، يُحلّل القارئ الإطارات ويوجّه البيانات إلى الجلسة الصحيحة.

إعداد Mux

json
{
  "outbounds": [{
    "mux": {
      "enabled": true,
      "concurrency": 8,    // max streams per connection
      "xudpConcurrency": 16,
      "xudpProxyUDP443": "reject"
    }
  }]
}

ملاحظات التنفيذ

  1. مساحة معرف الجلسة: 16-بت، لذا الحد الأقصى 65535 جلسة لكل اتصال. عمليًا، يُحدّد التزامن بحوالي ~128 للأداء.

  2. الحمل الزائد للتأطير: كل إطار بيانات يُضيف 6-8 بايت من البيانات الوصفية. لحزم UDP الصغيرة، هذا الحمل الزائد كبير.

  3. حجب رأس الصف: جميع الجلسات تتشارك اتصال TCP واحد. إذا توقفت جلسة واحدة (مثلاً، إعادة إرسال)، تتأثر جميع الجلسات. هذا هو المقايضة الأساسية لـ Mux.

  4. KeepAlive: إطار SessionStatusKeepAlive يمنع إغلاق اتصال النقل بسبب مهلة الخمول. يُرسل دوريًا عندما لا تتدفق بيانات.

  5. دورة حياة العامل: يُنشأ العمال عند الحاجة ويُنظّفون دوريًا (كل 30 ثانية). يُغلق العامل عندما تنتهي جميع جلساته ولا تصل جلسات جديدة.

  6. كشف XUDP: معرف الجلسة 0 + شبكة UDP يشير إلى وضع XUDP. انظر بروتوكول XUDP للتفاصيل.

تحليل تقني لأغراض إعادة التنفيذ.