نظام الأنابيب والمخازن المؤقتة
يشكل نظاما الأنابيب والمخازن المؤقتة العمود الفقري لنقل البيانات في Xray-core. كل بايت يمر عبر الوكيل يتدفق عبر هذه التجريدات.
المخزن المؤقت (common/buf/)
هيكل Buffer
// common/buf/buffer.go
type Buffer struct {
v []byte // شريحة البايتات الأساسية
start int32 // مؤشر القراءة
end int32 // مؤشر الكتابة
ownership ownership // مُدار، غير مُدار، أو bytespool
UDP *net.Destination // وجهة UDP لكل حزمة
}الخصائص الرئيسية:
- الحجم القياسي: 8192 بايت (8KB)
- مُجمَّع: المخازن المؤقتة المُدارة يُعاد تدويرها عبر
sync.Pool - مبني على المؤشرات:
start..endيُحدد نافذة البيانات النشطة - بيانات UDP الوصفية: حقل
UDPيحمل وجهة كل حزمة لوكالة UDP
تجمع المخازن المؤقتة
const Size = 8192
var pool = bytespool.GetPool(Size)
func New() *Buffer {
buf := pool.Get().([]byte)
return &Buffer{v: buf[:Size]}
}
func (b *Buffer) Release() {
if cap(b.v) == Size {
pool.Put(b.v)
}
}للتخصيصات الأكبر، يوفر bytespool تجمعات مبنية على فئات الحجم:
// common/bytespool/pool.go
// أحجام التجمع: 1K، 2K، 4K، 8K، 16K، 32K، 64K، 128K، ...
func Alloc(size int32) []byte // التخصيص من التجمع
func Free(b []byte) // الإعادة إلى التجمعMultiBuffer
MultiBuffer هي شريحة من *Buffer، تُستخدم لعمليات الدُفعات:
type MultiBuffer []*Buffer
func (mb MultiBuffer) Len() int32 // إجمالي البايتات عبر جميع المخازن المؤقتة
func (mb MultiBuffer) IsEmpty() bool
func (mb MultiBuffer) Copy(b []byte) int // نسخ إلى شريحة بايتات مسطحةواجهات Reader و Writer
type Reader interface {
ReadMultiBuffer() (MultiBuffer, error)
}
type Writer interface {
WriteMultiBuffer(MultiBuffer) error
}
type TimeoutReader interface {
Reader
ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
}buf.Copy — حلقة النقل الأساسية
تقريباً كل عمليات نقل البيانات تستخدم buf.Copy():
// common/buf/copy.go
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
for {
buffer, err := reader.ReadMultiBuffer()
if !buffer.IsEmpty() {
for _, opt := range options {
opt(&optHolder) // مثلاً UpdateActivity(timer)
}
if werr := writer.WriteMultiBuffer(buffer); werr != nil {
return werr
}
}
if err != nil {
return err // io.EOF = اكتمال طبيعي
}
}
}خيارات النسخ:
UpdateActivity(timer)— إعادة تعيين مهلة الخمول مع كل عملية نقلCountSize(counter)— حساب البايتات المنقولة
BufferedWriter
BufferedWriter يُجمِّع الكتابات الصغيرة:
type BufferedWriter struct {
writer Writer
buffer *Buffer
buffered bool
}
func (w *BufferedWriter) WriteMultiBuffer(mb MultiBuffer) error {
if w.buffered {
// التراكم في المخزن المؤقت
// التفريغ عندما يمتلئ المخزن
} else {
// تمرير مباشر إلى الكاتب الأساسي
}
}
func (w *BufferedWriter) SetBuffered(b bool) error {
if !b && w.buffer != nil {
// تفريغ البيانات المُخزَّنة
}
}يُستخدم هذا أثناء ترميز رأس البروتوكول: تخزين الرأس مؤقتاً، ثم تفريغه مع حمولة البيانات الأولى، لضمان إرسالهما في مقطع TCP واحد.
الأنبوب (transport/pipe/)
يربط الأنبوب بين الوارد والصادر مع تخزين مؤقت واعٍ بالضغط العكسي.
إنشاء أنبوب
// transport/pipe/pipe.go
func New(opts ...Option) (*Reader, *Writer) {
p := &pipe{
readSignal: signal.NewNotifier(),
writeSignal: signal.NewNotifier(),
done: done.New(),
errChan: make(chan error, 1),
option: pipeOption{limit: -1},
}
return &Reader{pipe: p}, &Writer{pipe: p}
}الخيارات:
WithSizeLimit(limit)— عتبة الضغط العكسي بالبايتات (من السياسة)WithoutSizeLimit()— مخزن مؤقت بلا حدود (-1)DiscardOverflow()— إسقاط الكتابات عند الامتلاء بدلاً من الحجز
الحالة الداخلية للأنبوب
// transport/pipe/impl.go
type pipe struct {
readSignal *signal.Notifier // إشارة للقارئ بتوفر البيانات
writeSignal *signal.Notifier // إشارة للكاتب بتوفر مساحة
done *done.Instance // الأنبوب مُغلق
errChan chan error // خطأ الإغلاق
option pipeOption
data buf.MultiBuffer // البيانات المُخزَّنة مؤقتاً
state int32 // نشط، مُغلق، أو خطأ
}مسار الكتابة
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
for {
// التحقق من إغلاق الأنبوب
if p.done.Done() { return io.ErrClosedPipe }
// التحقق من حد الحجم
if p.option.limit >= 0 && p.data.Len() >= p.option.limit {
// حجز: الانتظار حتى يستهلك القارئ البيانات
// أو الإسقاط إذا كان DiscardOverflow مُعيَّناً
p.writeSignal.Wait()
continue
}
// الإلحاق بالمخزن المؤقت
p.data = append(p.data, mb...)
p.readSignal.Signal() // إيقاظ القارئ
return nil
}
}مسار القراءة
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
for {
if !p.data.IsEmpty() {
mb := p.data
p.data = nil
p.writeSignal.Signal() // إيقاظ الكاتب المحجوز
return mb, nil
}
if p.done.Done() {
return nil, io.EOF
}
p.readSignal.Wait() // الحجز حتى توفر البيانات
}
}
func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
// نفس الشيء لكن مع مهلة عبر select + timer
}تدفق الضغط العكسي
sequenceDiagram
participant Inbound as وكيل الوارد
participant PW as كاتب الأنبوب
participant Pipe as مخزن الأنبوب المؤقت
participant PR as قارئ الأنبوب
participant Outbound as وكيل الصادر
Inbound->>PW: WriteMultiBuffer(data)
PW->>Pipe: الإلحاق بالمخزن المؤقت
alt المخزن المؤقت تحت الحد
PW->>PR: إشارة (بيانات متوفرة)
PR->>Outbound: ReadMultiBuffer()
Outbound->>Outbound: التوجيه إلى البعيد
PR->>PW: إشارة (مساحة متوفرة)
else المخزن المؤقت عند الحد
PW->>PW: حجز (انتظار المساحة)
Note over PW: ضغط عكسي!<br/>الوارد يتباطأ
PR->>Outbound: ReadMultiBuffer()
PR->>PW: إشارة (مساحة متوفرة)
PW->>PW: استئناف الكتابة
endحدود حجم الأنبوب (السياسة)
تأتي أحجام المخازن المؤقتة من نظام السياسات:
// features/policy/policy.go
type Buffer struct {
PerConnection int32 // حد حجم الأنبوب لكل اتصال
}
// مُهيأ لكل مستوى مستخدم:
// المستوى 0: 10240 بايت (10KB)
// المستوى 1+: قابل للتهيئةمن السياق:
func OptionsFromContext(ctx context.Context) []Option {
bp := policy.BufferPolicyFromContext(ctx)
if bp.PerConnection >= 0 {
return []Option{WithSizeLimit(bp.PerConnection)}
}
return []Option{WithoutSizeLimit()}
}رابط النقل
transport.Link يُقرن قارئاً وكاتباً:
// transport/link.go
type Link struct {
Reader buf.Reader
Writer buf.Writer
}ينشئ المُوزِّع رابطَي Link مترابطَين:
InboundLink: OutboundLink:
Reader = downlinkPipeReader Reader = uplinkPipeReader
Writer = uplinkPipeWriter Writer = downlinkPipeWriterالبيانات المكتوبة إلى InboundLink.Writer تُقرأ من OutboundLink.Reader (الرفع). البيانات المكتوبة إلى OutboundLink.Writer تُقرأ من InboundLink.Reader (التنزيل).
نظام الإشارات (common/signal/)
Notifier
إشارة غير حاجزة لإيقاظ الـ goroutines:
type Notifier struct {
c chan struct{}
}
func (n *Notifier) Signal() {
select {
case n.c <- struct{}{}: // تم إرسال الإشارة
default: // أُشير مسبقاً، تخطي
}
}
func (n *Notifier) Wait() <-chan struct{} {
return n.c
}ActivityTimer
يتتبع نشاط الاتصال لمهلة الخمول:
func CancelAfterInactivity(ctx context.Context, cancel func(), timeout time.Duration) *ActivityTimer
func (t *ActivityTimer) SetTimeout(timeout time.Duration) // تغيير المهلة
// يُستدعى بواسطة buf.Copy مع خيار UpdateActivityمع كل عملية نقل بيانات، يُعيد UpdateActivity تعيين المؤقت. إذا لم يكن هناك نشاط طوال مدة المهلة، يتم استدعاء cancel()، مما يُنهي كلا الـ goroutine-ين (الرفع + التنزيل).
ملاحظات التنفيذ
تجميع المخازن المؤقتة أمر أساسي: بدونه، ضغط جمع القمامة من ملايين التخصيصات بحجم 8KB يقتل الأداء. استخدم ما يعادل
sync.Pool.MultiBuffer يُمكِّن الإدخال/الإخراج بالدُفعات: القراءة/الكتابة لعدة أجزاء مرة واحدة تُقلل عبء استدعاءات النظام. استدعاء واحد لـ
ReadMultiBuffer()قد يُرجع عدة مخازن مؤقتة.الضغط العكسي يمنع نفاد الذاكرة: بدون حدود حجم الأنبوب، المُرسِل السريع + المُستقبِل البطيء يسبب نمواً غير محدود في الذاكرة. الحد الافتراضي 10KB صغير عمداً.
مبني على الإشارات مقابل مبني على القنوات:
Notifierهو إشارة غير حاجزة (دلالات trySignal). هذا يتجنب تسرب الـ goroutines عندما يتسابق الطرفان على الإشارة.Buffer.UDP حرج لـ UDP: كل مخزن مؤقت يمكن أن يحمل عنوان وجهة مختلف. هكذا يتعامل XUDP و TUN مع التوجيه لكل حزمة.
BufferedWriter لتجميع الرؤوس: قم دائماً بتخزين رأس البروتوكول + الحمولة الأولى معاً مؤقتاً. إرسالهما بشكل منفصل يكشف حجم الرأس لتحليل حركة المرور.