المتحكم: واجهة gRPC API للإدارة أثناء التشغيل
المتحكم (Commander) هو خادم gRPC المدمج في Xray الذي يوفر واجهات برمجية لإدارة النظام أثناء التشغيل. يتيح إجراء تغييرات ديناميكية في الإعدادات، واسترجاع الإحصائيات، ومراقبة النظام دون الحاجة لإعادة تشغيل العملية.
البنية
flowchart TD
subgraph Commander
C[Commander] --> GS[grpc.Server]
GS --> HS[HandlerService]
GS --> SS[StatsService]
GS --> RS[RoutingService]
GS --> LS[LoggerService]
GS --> OS[ObservatoryService]
GS --> RF[ReflectionService]
end
subgraph Transport
GS -->|option A| OL[OutboundListener]
OL --> OH[Outbound Handler]
OH --> RT[Xray Routing]
GS -->|option B| TL[TCP Listener]
TL --> NW[Direct network]
end
subgraph Client
CLI[gRPC Client] --> RT
CLI2[gRPC Client] --> NW
endنواة المتحكم
الملف: app/commander/commander.go
type Commander struct {
sync.Mutex
server *grpc.Server
services []Service
ohm outbound.Manager
tag string
listen string
}واجهة الخدمة
// app/commander/service.go
type Service interface {
Register(*grpc.Server)
}يجب أن يلبي كل تنفيذ لخدمة gRPC هذه الواجهة لتسجيل معالجاته.
التهيئة
func NewCommander(ctx context.Context, config *Config) (*Commander, error) {
c := &Commander{tag: config.Tag, listen: config.Listen}
core.RequireFeatures(ctx, func(om outbound.Manager) {
c.ohm = om
})
for _, rawConfig := range config.Service {
config, _ := rawConfig.GetInstance() // TypedMessage -> proto.Message
rawService, _ := common.CreateObject(ctx, config) // proto.Message -> Service
service, _ := rawService.(Service)
c.services = append(c.services, service)
}
}يتم إنشاء كل خدمة من إعداد protobuf الخاص بها عبر سجل الإعدادات العام (common.CreateObject).
البدء: وضعان للنقل
func (c *Commander) Start() error {
c.server = grpc.NewServer()
for _, service := range c.services {
service.Register(c.server)
}
if len(c.listen) > 0 {
// Direct TCP listener mode
l, _ := net.Listen("tcp", c.listen)
go c.server.Serve(l)
return nil
}
// Outbound listener mode (through Xray routing)
listener := &OutboundListener{
buffer: make(chan net.Conn, 4),
done: done.New(),
}
go c.server.Serve(listener)
c.ohm.RemoveHandler(context.Background(), c.tag)
return c.ohm.AddHandler(context.Background(), &Outbound{
tag: c.tag,
listener: listener,
})
}الوضع الأول -- TCP المباشر (حقل listen مُعيَّن): يفتح مقبس TCP حقيقي. أبسط لكنه مكشوف على الشبكة.
الوضع الثاني -- المعالج الصادر (الافتراضي): ينشئ OutboundListener افتراضيًا ويسجّل معالج Outbound. يتصل عملاء gRPC عبر نظام الوارد/التوجيه في Xray.
OutboundListener
الملف: app/commander/outbound.go
مُستمع net.Listener مدعوم بقناة اتصالات:
type OutboundListener struct {
buffer chan net.Conn // capacity: 4
done *done.Instance
}
func (l *OutboundListener) Accept() (net.Conn, error) {
select {
case <-l.done.Wait():
return nil, errors.New("listen closed")
case c := <-l.buffer:
return c, nil
}
}المعالج الصادر
يحوّل روابط النقل في Xray إلى net.Conn لخادم gRPC:
type Outbound struct {
tag string
listener *OutboundListener
access sync.RWMutex
closed bool
}
func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
closeSignal := done.New()
c := cnc.NewConnection(
cnc.ConnectionInputMulti(link.Writer),
cnc.ConnectionOutputMulti(link.Reader),
cnc.ConnectionOnClose(closeSignal),
)
co.listener.add(c)
<-closeSignal.Wait() // Block until connection closes
}الخدمات المتاحة
StatsService
الملف: app/stats/command/command.go
type statsServer struct {
stats feature_stats.Manager
startTime time.Time
}دوال gRPC:
| الدالة | الوصف |
|---|---|
GetStats(name, reset) | الحصول على قيمة عدّاد واحد، مع إمكانية إعادة تعيينه |
QueryStats(pattern, reset) | الاستعلام عن جميع العدّادات المطابقة لنمط سلسلة فرعية |
GetSysStats() | إحصائيات النظام: وقت التشغيل، الـ goroutines، تخصيص الذاكرة، جامع القمامة |
GetStatsOnline(name) | الحصول على عدد المستخدمين المتصلين لخريطة مستخدم متصل |
GetStatsOnlineIpList(name) | الحصول على قائمة عناوين IP مع الطوابع الزمنية لخريطة متصل |
GetAllOnlineUsers() | عرض جميع المستخدمين الذين لديهم عناوين IP نشطة |
توفر دالة GetSysStats تشخيصات وقت التشغيل:
func (s *statsServer) GetSysStats(ctx context.Context, request *SysStatsRequest) (*SysStatsResponse, error) {
var rtm runtime.MemStats
runtime.ReadMemStats(&rtm)
return &SysStatsResponse{
Uptime: uint32(time.Since(s.startTime).Seconds()),
NumGoroutine: uint32(runtime.NumGoroutine()),
Alloc: rtm.Alloc,
TotalAlloc: rtm.TotalAlloc,
Sys: rtm.Sys,
Mallocs: rtm.Mallocs,
Frees: rtm.Frees,
LiveObjects: rtm.Mallocs - rtm.Frees,
NumGC: rtm.NumGC,
PauseTotalNs: rtm.PauseTotalNs,
}, nil
}لأغراض التوافق مع v2ray، يتم تسجيل StatsService مرتين -- تحت كل من xray.app.stats.command.StatsService و v2ray.core.app.stats.command.StatsService:
func (s *service) Register(server *grpc.Server) {
ss := NewStatsServer(s.statsManager)
RegisterStatsServiceServer(server, ss)
vCoreDesc := StatsService_ServiceDesc
vCoreDesc.ServiceName = "v2ray.core.app.stats.command.StatsService"
server.RegisterService(&vCoreDesc, ss)
}HandlerService
الحزمة: app/proxyman/command
يدير المعالجات الواردة والصادرة أثناء التشغيل:
- إضافة/إزالة معالجات واردة
- إضافة/إزالة معالجات صادرة
- تعديل إعدادات المعالجات الواردة
RoutingService
الحزمة: app/router/command
إدارة التوجيه أثناء التشغيل:
- اختبار قواعد التوجيه ضد سياقات محددة
- الاستعلام عن جدول التوجيه
LoggerService
الحزمة: app/log/command
- إعادة تشغيل المسجّل
- متابعة مخرجات السجل عبر التدفق المستمر
ObservatoryService
الحزمة: app/observatory/command
GetOutboundStatus(): يُرجع الحالة الصحية لجميع المعالجات الصادرة المراقَبة
ReflectionService
الملف: app/commander/service.go
يُفعّل انعكاس خادم gRPC للأدوات:
type reflectionService struct{}
func (r reflectionService) Register(s *grpc.Server) {
reflection.Register(s)
}الإعدادات
الملف: infra/conf/api.go
{
"api": {
"tag": "api",
"listen": "127.0.0.1:10085",
"services": [
"HandlerService",
"StatsService",
"RoutingService",
"LoggerService",
"ObservatoryService",
"ReflectionService"
]
}
}تقوم دالة Build() بربط أسماء الخدمات بإعدادات protobuf الخاصة بها:
func (c *APIConfig) Build() (*commander.Config, error) {
for _, s := range c.Services {
switch strings.ToLower(s) {
case "reflectionservice":
services = append(services, serial.ToTypedMessage(&commander.ReflectionConfig{}))
case "handlerservice":
services = append(services, serial.ToTypedMessage(&handlerservice.Config{}))
case "statsservice":
services = append(services, serial.ToTypedMessage(&statsservice.Config{}))
// ... etc
}
}
}عند استخدام وضع المعالج الصادر (بدون listen)، تحتاج إلى:
- وارد
dokodemo-doorموسوم بوسم API - قاعدة توجيه توجّه ذلك الوارد إلى المعالج الصادر لـ API
{
"inbounds": [{
"tag": "api-in",
"protocol": "dokodemo-door",
"port": 10085,
"settings": { "address": "127.0.0.1" }
}],
"routing": {
"rules": [{
"inboundTag": ["api-in"],
"outboundTag": "api"
}]
}
}ملاحظات التنفيذ
يُسجّل المتحكم نفسه عبر
common.RegisterConfig((*Config)(nil), ...)ويتطلبoutbound.Managerمن سجل الميزات.سعة مخزن
OutboundListenerهي 4. إذا كان خادم gRPC بطيئًا في القبول و4 اتصالات في قائمة الانتظار، يتم إغلاق الاتصالات الإضافية (إسقاطها) فورًا.تقوم دالة
Outbound.Dispatch()بالحجب على<-closeSignal.Wait()للحفاظ على رابط النقل حيًا طوال مدة اتصال gRPC.عند إيقاف المتحكم، يتم استدعاء
c.server.Stop()الذي ينهي بشكل قسري جميع تدفقات gRPC النشطة. لا يوجد إيقاف تدريجي.إنشاء الخدمات يستخدم مسار
common.CreateObject()العام، الذي يبحث عن نوع إعداد protobuf في السجل العام. هذا يعني أنه يمكن توسيع الخدمات بواسطة حزم خارجية تسجّل إعدادات جديدة.حقل
listenيُفعّل وضع TCP المباشر، وهو أبسط في الإعداد لكنه يتجاوز نظام التوجيه في Xray. في هذا الوضع، لا يتم تسجيل أي معالج صادر.