إحصائيات حركة المرور
يوفر نظام الإحصائيات الفرعي في Xray قياس حركة المرور القائم على العدّادات، وتتبع المستخدمين المتصلين، وقنوات النشر/الاشتراك لمراقبة الأحداث في الوقت الفعلي. يتكامل مع المُوزّع لقياس حركة المرور بشكل شفاف لكل مستخدم، ولكل وارد، ولكل صادر.
البنية
flowchart TD
subgraph Stats Manager
M[Manager] --> C[Counters map]
M --> OM[OnlineMaps map]
M --> CH[Channels map]
end
subgraph Dispatcher Integration
D[DefaultDispatcher] --> SSW[SizeStatWriter]
SSW --> C
D --> OI[OnlineMap.AddIP]
OI --> OM
end
subgraph gRPC API
SS[StatsService] --> M
endمدير الإحصائيات
الملف: app/stats/stats.go
المدير هو السجل المركزي لجميع كائنات الإحصائيات:
type Manager struct {
access sync.RWMutex
counters map[string]*Counter
onlineMap map[string]*OnlineMap
channels map[string]*Channel
running bool
}يُنفّذ واجهة stats.Manager مع الدوال التالية:
| الدالة | الوصف |
|---|---|
RegisterCounter(name) | إنشاء عدّاد جديد باسم محدد |
UnregisterCounter(name) | إزالة عدّاد |
GetCounter(name) | البحث عن عدّاد بالاسم |
VisitCounters(visitor) | التكرار على جميع العدّادات باستخدام دالة استدعاء |
RegisterOnlineMap(name) | إنشاء خريطة متصلين جديدة باسم محدد |
GetOnlineMap(name) | البحث عن خريطة متصلين |
GetAllOnlineUsers() | إرجاع جميع المستخدمين الذين لديهم عناوين IP نشطة |
RegisterChannel(name) | إنشاء قناة نشر/اشتراك جديدة |
GetChannel(name) | البحث عن قناة |
العدّاد
الملف: app/stats/counter.go
عدّاد ذري خالٍ من الأقفال:
type Counter struct {
value int64
}
func (c *Counter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
func (c *Counter) Set(newValue int64) int64 {
return atomic.SwapInt64(&c.value, newValue)
}
func (c *Counter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}تُرجع Set() القيمة القديمة (مفيدة لعمليات "القراءة وإعادة التعيين" عبر Set(0)). تُرجع Add() القيمة الجديدة بعد الإضافة.
SizeStatWriter
الملف: app/dispatcher/stats.go
الجسر بين مستوى البيانات ونظام الإحصائيات:
type SizeStatWriter struct {
Counter stats.Counter
Writer buf.Writer
}
func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
w.Counter.Add(int64(mb.Len()))
return w.Writer.WriteMultiBuffer(mb)
}يغلّف buf.Writer ويضيف عدد البايتات لكل MultiBuffer إلى عدّاده قبل تمرير البيانات. هذا اعتراض بدون نسخ -- لا يتم تعديل البيانات.
التكامل مع المُوزّع
الملف: app/dispatcher/default.go
تقوم دالة DefaultDispatcher.getLink() بربط الإحصائيات:
func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *transport.Link) {
uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...)
inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
outboundLink := &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}
if user != nil && len(user.Email) > 0 {
p := d.policy.ForLevel(user.Level)
if p.Stats.UserUplink {
name := "user>>>" + user.Email + ">>>traffic>>>uplink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
inboundLink.Writer = &SizeStatWriter{Counter: c, Writer: inboundLink.Writer}
}
}
if p.Stats.UserDownlink {
name := "user>>>" + user.Email + ">>>traffic>>>downlink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
outboundLink.Writer = &SizeStatWriter{Counter: c, Writer: outboundLink.Writer}
}
}
if p.Stats.UserOnline {
name := "user>>>" + user.Email + ">>>online"
if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
om.AddIP(sessionInbounds.Source.Address.String())
}
}
}
}اصطلاح تسمية العدّادات
تتبع العدّادات اصطلاح تسلسلي بفاصل >>>:
| النمط | مثال | الاتجاه |
|---|---|---|
user>>>{email}>>>traffic>>>uplink | user>>>user1@example.com>>>traffic>>>uplink | العميل -> Xray |
user>>>{email}>>>traffic>>>downlink | user>>>user1@example.com>>>traffic>>>downlink | Xray -> العميل |
user>>>{email}>>>online | user>>>user1@example.com>>>online | حالة الاتصال (OnlineMap) |
inbound>>>{tag}>>>traffic>>>uplink | inbound>>>vmess-in>>>traffic>>>uplink | حركة صاعدة لكل وارد |
inbound>>>{tag}>>>traffic>>>downlink | inbound>>>vmess-in>>>traffic>>>downlink | حركة هابطة لكل وارد |
outbound>>>{tag}>>>traffic>>>uplink | outbound>>>freedom>>>traffic>>>uplink | حركة صاعدة لكل صادر |
outbound>>>{tag}>>>traffic>>>downlink | outbound>>>freedom>>>traffic>>>downlink | حركة هابطة لكل صادر |
WrapLink
تخدم دالة WrapLink() غرضًا مشابهًا لـ DispatchLink() (المستخدمة من الوكيل العكسي والمستدعين الداخليين الآخرين):
func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link {
link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}
if user != nil && len(user.Email) > 0 {
if p.Stats.UserUplink {
link.Reader.(*buf.TimeoutWrapperReader).Counter = c
}
if p.Stats.UserDownlink {
link.Writer = &SizeStatWriter{Counter: c, Writer: link.Writer}
}
// Online tracking...
}
return link
}لاحظ عدم التماثل: في WrapLink، يتم تتبع الحركة الصاعدة على القارئ (عبر TimeoutWrapperReader.Counter)، بينما في getLink، يتم تتبع الحركة الصاعدة على الكاتب (عبر SizeStatWriter). وذلك لأن اتجاهات الرابط معكوسة بين المسارين.
OnlineMap
الملف: app/stats/online_map.go
يتتبع عناوين IP المتصلة حاليًا لمستخدم معين:
type OnlineMap struct {
ipList map[string]time.Time
access sync.RWMutex
lastCleanup time.Time
cleanupPeriod time.Duration // 10 seconds
}السلوك الرئيسي:
AddIP(ip)يضيف أو يُحدّث الطابع الزمني لعنوان IP. يتخطى127.0.0.1.RemoveExpiredIPs()يزيل الإدخالات الأقدم من 20 ثانية- يتم تفعيل التنظيف بشكل كسول: فقط عند استدعاء
AddIP()ومرور 10 ثوانٍ أو أكثر منذ آخر تنظيف Count()يُرجع عدد عناوين IP النشطة حاليًاList()يُرجع جميع سلاسل عناوين IP النشطة
انتهاء الصلاحية بـ 20 ثانية يعني أن عنوان IP يُعتبر "متصلاً" فقط إذا كان يرسل حركة مرور بنشاط خلال آخر 20 ثانية.
القناة (نشر/اشتراك)
الملف: app/stats/channel.go
قناة نشر-اشتراك لبث الأحداث في الوقت الفعلي:
type Channel struct {
channel chan channelMessage
subscribers []chan interface{}
access sync.RWMutex
closed chan struct{}
blocking bool
bufferSize int
subsLimit int
}أوضاع النشر:
- حاجب:
Publish()يحجب إذا كان مخزن القناة ممتلئًا (ينتظر المستهلك) - غير حاجب: إذا كان المخزن ممتلئًا، يُنشئ goroutine للتسليم بشكل غير متزامن
أوضاع البث:
- حاجب: البث لكل مشترك يحجب إذا كانت قناة المشترك ممتلئة
- غير حاجب: إذا كانت قناة المشترك ممتلئة، يُنشئ goroutine لكل مشترك
القناة الافتراضية غير حاجبة بحجم مخزن 64.
دورة الحياة:
func (c *Channel) Start() error {
c.closed = make(chan struct{})
go func() {
for {
select {
case pub := <-c.channel:
for _, sub := range c.Subscribers() {
pub.broadcastNonBlocking(sub)
}
case <-c.closed:
// Unsubscribe all and close
return
}
}
}()
}التفعيل القائم على السياسات
يتم التحكم في جمع الإحصائيات من خلال نظام السياسات. كل مستوى مستخدم له إعدادات:
type Stats struct {
UserUplink bool
UserDownlink bool
UserOnline bool
}يجب أن يكون قسم stats موجودًا في الإعدادات (حتى لو كان فارغًا) لتفعيل مدير الإحصائيات:
{
"stats": {},
"policy": {
"levels": {
"0": {
"statsUserUplink": true,
"statsUserDownlink": true,
"statsUserOnline": true
}
}
}
}الاستعلام عن الإحصائيات
عبر خدمة StatsService في gRPC:
# Get a single counter
grpcurl -d '{"name": "user>>>user1@example.com>>>traffic>>>uplink"}' \
localhost:10085 xray.app.stats.command.StatsService/GetStats
# Get and reset a counter
grpcurl -d '{"name": "...", "reset": true}' \
localhost:10085 xray.app.stats.command.StatsService/GetStats
# Query by pattern
grpcurl -d '{"pattern": "user>>>user1"}' \
localhost:10085 xray.app.stats.command.StatsService/QueryStats
# Get online user count
grpcurl -d '{"name": "user>>>user1@example.com>>>online"}' \
localhost:10085 xray.app.stats.command.StatsService/GetStatsOnline
# Get all online users
grpcurl localhost:10085 xray.app.stats.command.StatsService/GetAllOnlineUsersملاحظات التنفيذ
تستخدم العدّادات عمليات
sync/atomicحصريًا -- بدون أقفال mutex للقراءة/الكتابة. هذا يعني أن عمليات العدّاد خالية من الأقفال وسريعة للغاية (تعليمة CAS واحدة على معظم البنى).الدالة المساعدة
GetOrRegisterCounter()(فيfeatures/stats) تنشئ عدّادًا إذا لم يكن موجودًا، مما يتجنب حالات السباق بين فحص المُوزّع لوجود عدّاد وتسجيله.تقوم
Manager.GetAllOnlineUsers()بالتكرار على جميع خرائط المتصلين وتُرجع المستخدمين الذين لديهم عنوان IP واحد على الأقل غير منتهي الصلاحية. تستخدم قفل كتابة (وليس قفل قراءة) لأنIpTimeMap()قد تُفعّل التنظيف داخليًا.تتراكم عدّادات حركة المرور بالبايتات بشكل لا نهائي حتى يتم إعادة تعيينها صراحةً عبر
Set(0). أمر gRPCGetStatsمعreset: trueيقرأ ويُعيد التعيين ذريًا.يحسب
SizeStatWriterإجمالي البايتات فيMultiBuffer(مجموع أطوال جميع المخازن)، الذي يمثل حجم الحمولة بعد التخزين الداخلي في Xray لكن قبل ترميز السلك.خرائط المتصلين خاصة بكل مستخدم، وليس بكل اتصال. الاتصالات المتعددة من نفس عنوان IP تُحدّث نفس الطابع الزمني. انتهاء الصلاحية بـ 20 ثانية هو تقدير تجريبي: إذا أصبح اتصال المستخدم خاملاً لمدة 20 ثانية، يتم إزالته من قائمة المتصلين.