Skip to content

نظام الأنابيب والمخازن المؤقتة

يشكل نظاما الأنابيب والمخازن المؤقتة العمود الفقري لنقل البيانات في Xray-core. كل بايت يمر عبر الوكيل يتدفق عبر هذه التجريدات.

المخزن المؤقت (common/buf/)

هيكل Buffer

go
// common/buf/buffer.go
type Buffer struct {
    v         []byte          // شريحة البايتات الأساسية
    start     int32           // مؤشر القراءة
    end       int32           // مؤشر الكتابة
    ownership ownership       // مُدار، غير مُدار، أو bytespool
    UDP       *net.Destination // وجهة UDP لكل حزمة
}

الخصائص الرئيسية:

  • الحجم القياسي: 8192 بايت (8KB)
  • مُجمَّع: المخازن المؤقتة المُدارة يُعاد تدويرها عبر sync.Pool
  • مبني على المؤشرات: start..end يُحدد نافذة البيانات النشطة
  • بيانات UDP الوصفية: حقل UDP يحمل وجهة كل حزمة لوكالة UDP

تجمع المخازن المؤقتة

go
const Size = 8192
var pool = bytespool.GetPool(Size)

func New() *Buffer {
    buf := pool.Get().([]byte)
    return &Buffer{v: buf[:Size]}
}

func (b *Buffer) Release() {
    if cap(b.v) == Size {
        pool.Put(b.v)
    }
}

للتخصيصات الأكبر، يوفر bytespool تجمعات مبنية على فئات الحجم:

go
// common/bytespool/pool.go
// أحجام التجمع: 1K، 2K، 4K، 8K، 16K، 32K، 64K، 128K، ...
func Alloc(size int32) []byte  // التخصيص من التجمع
func Free(b []byte)            // الإعادة إلى التجمع

MultiBuffer

MultiBuffer هي شريحة من *Buffer، تُستخدم لعمليات الدُفعات:

go
type MultiBuffer []*Buffer

func (mb MultiBuffer) Len() int32      // إجمالي البايتات عبر جميع المخازن المؤقتة
func (mb MultiBuffer) IsEmpty() bool
func (mb MultiBuffer) Copy(b []byte) int // نسخ إلى شريحة بايتات مسطحة

واجهات Reader و Writer

go
type Reader interface {
    ReadMultiBuffer() (MultiBuffer, error)
}

type Writer interface {
    WriteMultiBuffer(MultiBuffer) error
}

type TimeoutReader interface {
    Reader
    ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
}

buf.Copy — حلقة النقل الأساسية

تقريباً كل عمليات نقل البيانات تستخدم buf.Copy():

go
// common/buf/copy.go
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
    for {
        buffer, err := reader.ReadMultiBuffer()
        if !buffer.IsEmpty() {
            for _, opt := range options {
                opt(&optHolder)  // مثلاً UpdateActivity(timer)
            }
            if werr := writer.WriteMultiBuffer(buffer); werr != nil {
                return werr
            }
        }
        if err != nil {
            return err  // io.EOF = اكتمال طبيعي
        }
    }
}

خيارات النسخ:

  • UpdateActivity(timer) — إعادة تعيين مهلة الخمول مع كل عملية نقل
  • CountSize(counter) — حساب البايتات المنقولة

BufferedWriter

BufferedWriter يُجمِّع الكتابات الصغيرة:

go
type BufferedWriter struct {
    writer  Writer
    buffer  *Buffer
    buffered bool
}

func (w *BufferedWriter) WriteMultiBuffer(mb MultiBuffer) error {
    if w.buffered {
        // التراكم في المخزن المؤقت
        // التفريغ عندما يمتلئ المخزن
    } else {
        // تمرير مباشر إلى الكاتب الأساسي
    }
}

func (w *BufferedWriter) SetBuffered(b bool) error {
    if !b && w.buffer != nil {
        // تفريغ البيانات المُخزَّنة
    }
}

يُستخدم هذا أثناء ترميز رأس البروتوكول: تخزين الرأس مؤقتاً، ثم تفريغه مع حمولة البيانات الأولى، لضمان إرسالهما في مقطع TCP واحد.

الأنبوب (transport/pipe/)

يربط الأنبوب بين الوارد والصادر مع تخزين مؤقت واعٍ بالضغط العكسي.

إنشاء أنبوب

go
// transport/pipe/pipe.go
func New(opts ...Option) (*Reader, *Writer) {
    p := &pipe{
        readSignal:  signal.NewNotifier(),
        writeSignal: signal.NewNotifier(),
        done:        done.New(),
        errChan:     make(chan error, 1),
        option: pipeOption{limit: -1},
    }
    return &Reader{pipe: p}, &Writer{pipe: p}
}

الخيارات:

  • WithSizeLimit(limit) — عتبة الضغط العكسي بالبايتات (من السياسة)
  • WithoutSizeLimit() — مخزن مؤقت بلا حدود (-1)
  • DiscardOverflow() — إسقاط الكتابات عند الامتلاء بدلاً من الحجز

الحالة الداخلية للأنبوب

go
// transport/pipe/impl.go
type pipe struct {
    readSignal  *signal.Notifier  // إشارة للقارئ بتوفر البيانات
    writeSignal *signal.Notifier  // إشارة للكاتب بتوفر مساحة
    done        *done.Instance    // الأنبوب مُغلق
    errChan     chan error         // خطأ الإغلاق
    option      pipeOption
    data        buf.MultiBuffer   // البيانات المُخزَّنة مؤقتاً
    state       int32             // نشط، مُغلق، أو خطأ
}

مسار الكتابة

go
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
    for {
        // التحقق من إغلاق الأنبوب
        if p.done.Done() { return io.ErrClosedPipe }

        // التحقق من حد الحجم
        if p.option.limit >= 0 && p.data.Len() >= p.option.limit {
            // حجز: الانتظار حتى يستهلك القارئ البيانات
            // أو الإسقاط إذا كان DiscardOverflow مُعيَّناً
            p.writeSignal.Wait()
            continue
        }

        // الإلحاق بالمخزن المؤقت
        p.data = append(p.data, mb...)
        p.readSignal.Signal()  // إيقاظ القارئ
        return nil
    }
}

مسار القراءة

go
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
    for {
        if !p.data.IsEmpty() {
            mb := p.data
            p.data = nil
            p.writeSignal.Signal()  // إيقاظ الكاتب المحجوز
            return mb, nil
        }
        if p.done.Done() {
            return nil, io.EOF
        }
        p.readSignal.Wait()  // الحجز حتى توفر البيانات
    }
}

func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
    // نفس الشيء لكن مع مهلة عبر select + timer
}

تدفق الضغط العكسي

mermaid
sequenceDiagram
    participant Inbound as وكيل الوارد
    participant PW as كاتب الأنبوب
    participant Pipe as مخزن الأنبوب المؤقت
    participant PR as قارئ الأنبوب
    participant Outbound as وكيل الصادر

    Inbound->>PW: WriteMultiBuffer(data)
    PW->>Pipe: الإلحاق بالمخزن المؤقت

    alt المخزن المؤقت تحت الحد
        PW->>PR: إشارة (بيانات متوفرة)
        PR->>Outbound: ReadMultiBuffer()
        Outbound->>Outbound: التوجيه إلى البعيد
        PR->>PW: إشارة (مساحة متوفرة)
    else المخزن المؤقت عند الحد
        PW->>PW: حجز (انتظار المساحة)
        Note over PW: ضغط عكسي!<br/>الوارد يتباطأ
        PR->>Outbound: ReadMultiBuffer()
        PR->>PW: إشارة (مساحة متوفرة)
        PW->>PW: استئناف الكتابة
    end

حدود حجم الأنبوب (السياسة)

تأتي أحجام المخازن المؤقتة من نظام السياسات:

go
// features/policy/policy.go
type Buffer struct {
    PerConnection int32  // حد حجم الأنبوب لكل اتصال
}

// مُهيأ لكل مستوى مستخدم:
// المستوى 0: 10240 بايت (10KB)
// المستوى 1+: قابل للتهيئة

من السياق:

go
func OptionsFromContext(ctx context.Context) []Option {
    bp := policy.BufferPolicyFromContext(ctx)
    if bp.PerConnection >= 0 {
        return []Option{WithSizeLimit(bp.PerConnection)}
    }
    return []Option{WithoutSizeLimit()}
}

رابط النقل

transport.Link يُقرن قارئاً وكاتباً:

go
// transport/link.go
type Link struct {
    Reader buf.Reader
    Writer buf.Writer
}

ينشئ المُوزِّع رابطَي Link مترابطَين:

InboundLink:                      OutboundLink:
  Reader = downlinkPipeReader       Reader = uplinkPipeReader
  Writer = uplinkPipeWriter         Writer = downlinkPipeWriter

البيانات المكتوبة إلى InboundLink.Writer تُقرأ من OutboundLink.Reader (الرفع). البيانات المكتوبة إلى OutboundLink.Writer تُقرأ من InboundLink.Reader (التنزيل).

نظام الإشارات (common/signal/)

Notifier

إشارة غير حاجزة لإيقاظ الـ goroutines:

go
type Notifier struct {
    c chan struct{}
}

func (n *Notifier) Signal() {
    select {
    case n.c <- struct{}{}: // تم إرسال الإشارة
    default: // أُشير مسبقاً، تخطي
    }
}

func (n *Notifier) Wait() <-chan struct{} {
    return n.c
}

ActivityTimer

يتتبع نشاط الاتصال لمهلة الخمول:

go
func CancelAfterInactivity(ctx context.Context, cancel func(), timeout time.Duration) *ActivityTimer

func (t *ActivityTimer) SetTimeout(timeout time.Duration)  // تغيير المهلة
// يُستدعى بواسطة buf.Copy مع خيار UpdateActivity

مع كل عملية نقل بيانات، يُعيد UpdateActivity تعيين المؤقت. إذا لم يكن هناك نشاط طوال مدة المهلة، يتم استدعاء cancel()، مما يُنهي كلا الـ goroutine-ين (الرفع + التنزيل).

ملاحظات التنفيذ

  1. تجميع المخازن المؤقتة أمر أساسي: بدونه، ضغط جمع القمامة من ملايين التخصيصات بحجم 8KB يقتل الأداء. استخدم ما يعادل sync.Pool.

  2. MultiBuffer يُمكِّن الإدخال/الإخراج بالدُفعات: القراءة/الكتابة لعدة أجزاء مرة واحدة تُقلل عبء استدعاءات النظام. استدعاء واحد لـ ReadMultiBuffer() قد يُرجع عدة مخازن مؤقتة.

  3. الضغط العكسي يمنع نفاد الذاكرة: بدون حدود حجم الأنبوب، المُرسِل السريع + المُستقبِل البطيء يسبب نمواً غير محدود في الذاكرة. الحد الافتراضي 10KB صغير عمداً.

  4. مبني على الإشارات مقابل مبني على القنوات: Notifier هو إشارة غير حاجزة (دلالات trySignal). هذا يتجنب تسرب الـ goroutines عندما يتسابق الطرفان على الإشارة.

  5. Buffer.UDP حرج لـ UDP: كل مخزن مؤقت يمكن أن يحمل عنوان وجهة مختلف. هكذا يتعامل XUDP و TUN مع التوجيه لكل حزمة.

  6. BufferedWriter لتجميع الرؤوس: قم دائماً بتخزين رأس البروتوكول + الحمولة الأولى معاً مؤقتاً. إرسالهما بشكل منفصل يكشف حجم الرأس لتحليل حركة المرور.

تحليل تقني لأغراض إعادة التنفيذ.