Skip to content

إحصائيات حركة المرور

يوفر نظام الإحصائيات الفرعي في Xray قياس حركة المرور القائم على العدّادات، وتتبع المستخدمين المتصلين، وقنوات النشر/الاشتراك لمراقبة الأحداث في الوقت الفعلي. يتكامل مع المُوزّع لقياس حركة المرور بشكل شفاف لكل مستخدم، ولكل وارد، ولكل صادر.

البنية

mermaid
flowchart TD
    subgraph Stats Manager
        M[Manager] --> C[Counters map]
        M --> OM[OnlineMaps map]
        M --> CH[Channels map]
    end

    subgraph Dispatcher Integration
        D[DefaultDispatcher] --> SSW[SizeStatWriter]
        SSW --> C
        D --> OI[OnlineMap.AddIP]
        OI --> OM
    end

    subgraph gRPC API
        SS[StatsService] --> M
    end

مدير الإحصائيات

الملف: app/stats/stats.go

المدير هو السجل المركزي لجميع كائنات الإحصائيات:

go
type Manager struct {
    access    sync.RWMutex
    counters  map[string]*Counter
    onlineMap map[string]*OnlineMap
    channels  map[string]*Channel
    running   bool
}

يُنفّذ واجهة stats.Manager مع الدوال التالية:

الدالةالوصف
RegisterCounter(name)إنشاء عدّاد جديد باسم محدد
UnregisterCounter(name)إزالة عدّاد
GetCounter(name)البحث عن عدّاد بالاسم
VisitCounters(visitor)التكرار على جميع العدّادات باستخدام دالة استدعاء
RegisterOnlineMap(name)إنشاء خريطة متصلين جديدة باسم محدد
GetOnlineMap(name)البحث عن خريطة متصلين
GetAllOnlineUsers()إرجاع جميع المستخدمين الذين لديهم عناوين IP نشطة
RegisterChannel(name)إنشاء قناة نشر/اشتراك جديدة
GetChannel(name)البحث عن قناة

العدّاد

الملف: app/stats/counter.go

عدّاد ذري خالٍ من الأقفال:

go
type Counter struct {
    value int64
}

func (c *Counter) Value() int64 {
    return atomic.LoadInt64(&c.value)
}

func (c *Counter) Set(newValue int64) int64 {
    return atomic.SwapInt64(&c.value, newValue)
}

func (c *Counter) Add(delta int64) int64 {
    return atomic.AddInt64(&c.value, delta)
}

تُرجع Set() القيمة القديمة (مفيدة لعمليات "القراءة وإعادة التعيين" عبر Set(0)). تُرجع Add() القيمة الجديدة بعد الإضافة.

SizeStatWriter

الملف: app/dispatcher/stats.go

الجسر بين مستوى البيانات ونظام الإحصائيات:

go
type SizeStatWriter struct {
    Counter stats.Counter
    Writer  buf.Writer
}

func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
    w.Counter.Add(int64(mb.Len()))
    return w.Writer.WriteMultiBuffer(mb)
}

يغلّف buf.Writer ويضيف عدد البايتات لكل MultiBuffer إلى عدّاده قبل تمرير البيانات. هذا اعتراض بدون نسخ -- لا يتم تعديل البيانات.

التكامل مع المُوزّع

الملف: app/dispatcher/default.go

تقوم دالة DefaultDispatcher.getLink() بربط الإحصائيات:

go
func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *transport.Link) {
    uplinkReader, uplinkWriter := pipe.New(opt...)
    downlinkReader, downlinkWriter := pipe.New(opt...)

    inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
    outboundLink := &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}

    if user != nil && len(user.Email) > 0 {
        p := d.policy.ForLevel(user.Level)

        if p.Stats.UserUplink {
            name := "user>>>" + user.Email + ">>>traffic>>>uplink"
            if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
                inboundLink.Writer = &SizeStatWriter{Counter: c, Writer: inboundLink.Writer}
            }
        }

        if p.Stats.UserDownlink {
            name := "user>>>" + user.Email + ">>>traffic>>>downlink"
            if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
                outboundLink.Writer = &SizeStatWriter{Counter: c, Writer: outboundLink.Writer}
            }
        }

        if p.Stats.UserOnline {
            name := "user>>>" + user.Email + ">>>online"
            if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
                om.AddIP(sessionInbounds.Source.Address.String())
            }
        }
    }
}

اصطلاح تسمية العدّادات

تتبع العدّادات اصطلاح تسلسلي بفاصل >>>:

النمطمثالالاتجاه
user>>>{email}>>>traffic>>>uplinkuser>>>user1@example.com>>>traffic>>>uplinkالعميل -> Xray
user>>>{email}>>>traffic>>>downlinkuser>>>user1@example.com>>>traffic>>>downlinkXray -> العميل
user>>>{email}>>>onlineuser>>>user1@example.com>>>onlineحالة الاتصال (OnlineMap)
inbound>>>{tag}>>>traffic>>>uplinkinbound>>>vmess-in>>>traffic>>>uplinkحركة صاعدة لكل وارد
inbound>>>{tag}>>>traffic>>>downlinkinbound>>>vmess-in>>>traffic>>>downlinkحركة هابطة لكل وارد
outbound>>>{tag}>>>traffic>>>uplinkoutbound>>>freedom>>>traffic>>>uplinkحركة صاعدة لكل صادر
outbound>>>{tag}>>>traffic>>>downlinkoutbound>>>freedom>>>traffic>>>downlinkحركة هابطة لكل صادر

تخدم دالة WrapLink() غرضًا مشابهًا لـ DispatchLink() (المستخدمة من الوكيل العكسي والمستدعين الداخليين الآخرين):

go
func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link {
    link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}

    if user != nil && len(user.Email) > 0 {
        if p.Stats.UserUplink {
            link.Reader.(*buf.TimeoutWrapperReader).Counter = c
        }
        if p.Stats.UserDownlink {
            link.Writer = &SizeStatWriter{Counter: c, Writer: link.Writer}
        }
        // Online tracking...
    }
    return link
}

لاحظ عدم التماثل: في WrapLink، يتم تتبع الحركة الصاعدة على القارئ (عبر TimeoutWrapperReader.Counter)، بينما في getLink، يتم تتبع الحركة الصاعدة على الكاتب (عبر SizeStatWriter). وذلك لأن اتجاهات الرابط معكوسة بين المسارين.

OnlineMap

الملف: app/stats/online_map.go

يتتبع عناوين IP المتصلة حاليًا لمستخدم معين:

go
type OnlineMap struct {
    ipList        map[string]time.Time
    access        sync.RWMutex
    lastCleanup   time.Time
    cleanupPeriod time.Duration  // 10 seconds
}

السلوك الرئيسي:

  • AddIP(ip) يضيف أو يُحدّث الطابع الزمني لعنوان IP. يتخطى 127.0.0.1.
  • RemoveExpiredIPs() يزيل الإدخالات الأقدم من 20 ثانية
  • يتم تفعيل التنظيف بشكل كسول: فقط عند استدعاء AddIP() ومرور 10 ثوانٍ أو أكثر منذ آخر تنظيف
  • Count() يُرجع عدد عناوين IP النشطة حاليًا
  • List() يُرجع جميع سلاسل عناوين IP النشطة

انتهاء الصلاحية بـ 20 ثانية يعني أن عنوان IP يُعتبر "متصلاً" فقط إذا كان يرسل حركة مرور بنشاط خلال آخر 20 ثانية.

القناة (نشر/اشتراك)

الملف: app/stats/channel.go

قناة نشر-اشتراك لبث الأحداث في الوقت الفعلي:

go
type Channel struct {
    channel     chan channelMessage
    subscribers []chan interface{}
    access      sync.RWMutex
    closed      chan struct{}
    blocking    bool
    bufferSize  int
    subsLimit   int
}

أوضاع النشر:

  • حاجب: Publish() يحجب إذا كان مخزن القناة ممتلئًا (ينتظر المستهلك)
  • غير حاجب: إذا كان المخزن ممتلئًا، يُنشئ goroutine للتسليم بشكل غير متزامن

أوضاع البث:

  • حاجب: البث لكل مشترك يحجب إذا كانت قناة المشترك ممتلئة
  • غير حاجب: إذا كانت قناة المشترك ممتلئة، يُنشئ goroutine لكل مشترك

القناة الافتراضية غير حاجبة بحجم مخزن 64.

دورة الحياة:

go
func (c *Channel) Start() error {
    c.closed = make(chan struct{})
    go func() {
        for {
            select {
            case pub := <-c.channel:
                for _, sub := range c.Subscribers() {
                    pub.broadcastNonBlocking(sub)
                }
            case <-c.closed:
                // Unsubscribe all and close
                return
            }
        }
    }()
}

التفعيل القائم على السياسات

يتم التحكم في جمع الإحصائيات من خلال نظام السياسات. كل مستوى مستخدم له إعدادات:

go
type Stats struct {
    UserUplink   bool
    UserDownlink bool
    UserOnline   bool
}

يجب أن يكون قسم stats موجودًا في الإعدادات (حتى لو كان فارغًا) لتفعيل مدير الإحصائيات:

json
{
    "stats": {},
    "policy": {
        "levels": {
            "0": {
                "statsUserUplink": true,
                "statsUserDownlink": true,
                "statsUserOnline": true
            }
        }
    }
}

الاستعلام عن الإحصائيات

عبر خدمة StatsService في gRPC:

bash
# Get a single counter
grpcurl -d '{"name": "user>>>user1@example.com>>>traffic>>>uplink"}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStats

# Get and reset a counter
grpcurl -d '{"name": "...", "reset": true}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStats

# Query by pattern
grpcurl -d '{"pattern": "user>>>user1"}' \
    localhost:10085 xray.app.stats.command.StatsService/QueryStats

# Get online user count
grpcurl -d '{"name": "user>>>user1@example.com>>>online"}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStatsOnline

# Get all online users
grpcurl localhost:10085 xray.app.stats.command.StatsService/GetAllOnlineUsers

ملاحظات التنفيذ

  • تستخدم العدّادات عمليات sync/atomic حصريًا -- بدون أقفال mutex للقراءة/الكتابة. هذا يعني أن عمليات العدّاد خالية من الأقفال وسريعة للغاية (تعليمة CAS واحدة على معظم البنى).

  • الدالة المساعدة GetOrRegisterCounter() (في features/stats) تنشئ عدّادًا إذا لم يكن موجودًا، مما يتجنب حالات السباق بين فحص المُوزّع لوجود عدّاد وتسجيله.

  • تقوم Manager.GetAllOnlineUsers() بالتكرار على جميع خرائط المتصلين وتُرجع المستخدمين الذين لديهم عنوان IP واحد على الأقل غير منتهي الصلاحية. تستخدم قفل كتابة (وليس قفل قراءة) لأن IpTimeMap() قد تُفعّل التنظيف داخليًا.

  • تتراكم عدّادات حركة المرور بالبايتات بشكل لا نهائي حتى يتم إعادة تعيينها صراحةً عبر Set(0). أمر gRPC GetStats مع reset: true يقرأ ويُعيد التعيين ذريًا.

  • يحسب SizeStatWriter إجمالي البايتات في MultiBuffer (مجموع أطوال جميع المخازن)، الذي يمثل حجم الحمولة بعد التخزين الداخلي في Xray لكن قبل ترميز السلك.

  • خرائط المتصلين خاصة بكل مستخدم، وليس بكل اتصال. الاتصالات المتعددة من نفس عنوان IP تُحدّث نفس الطابع الزمني. انتهاء الصلاحية بـ 20 ثانية هو تقدير تجريبي: إذا أصبح اتصال المستخدم خاملاً لمدة 20 ثانية، يتم إزالته من قائمة المتصلين.

تحليل تقني لأغراض إعادة التنفيذ.