Skip to content

المتحكم: واجهة gRPC API للإدارة أثناء التشغيل

المتحكم (Commander) هو خادم gRPC المدمج في Xray الذي يوفر واجهات برمجية لإدارة النظام أثناء التشغيل. يتيح إجراء تغييرات ديناميكية في الإعدادات، واسترجاع الإحصائيات، ومراقبة النظام دون الحاجة لإعادة تشغيل العملية.

البنية

mermaid
flowchart TD
    subgraph Commander
        C[Commander] --> GS[grpc.Server]
        GS --> HS[HandlerService]
        GS --> SS[StatsService]
        GS --> RS[RoutingService]
        GS --> LS[LoggerService]
        GS --> OS[ObservatoryService]
        GS --> RF[ReflectionService]
    end

    subgraph Transport
        GS -->|option A| OL[OutboundListener]
        OL --> OH[Outbound Handler]
        OH --> RT[Xray Routing]

        GS -->|option B| TL[TCP Listener]
        TL --> NW[Direct network]
    end

    subgraph Client
        CLI[gRPC Client] --> RT
        CLI2[gRPC Client] --> NW
    end

نواة المتحكم

الملف: app/commander/commander.go

go
type Commander struct {
    sync.Mutex
    server   *grpc.Server
    services []Service
    ohm      outbound.Manager
    tag      string
    listen   string
}

واجهة الخدمة

go
// app/commander/service.go
type Service interface {
    Register(*grpc.Server)
}

يجب أن يلبي كل تنفيذ لخدمة gRPC هذه الواجهة لتسجيل معالجاته.

التهيئة

go
func NewCommander(ctx context.Context, config *Config) (*Commander, error) {
    c := &Commander{tag: config.Tag, listen: config.Listen}

    core.RequireFeatures(ctx, func(om outbound.Manager) {
        c.ohm = om
    })

    for _, rawConfig := range config.Service {
        config, _ := rawConfig.GetInstance()             // TypedMessage -> proto.Message
        rawService, _ := common.CreateObject(ctx, config) // proto.Message -> Service
        service, _ := rawService.(Service)
        c.services = append(c.services, service)
    }
}

يتم إنشاء كل خدمة من إعداد protobuf الخاص بها عبر سجل الإعدادات العام (common.CreateObject).

البدء: وضعان للنقل

go
func (c *Commander) Start() error {
    c.server = grpc.NewServer()
    for _, service := range c.services {
        service.Register(c.server)
    }

    if len(c.listen) > 0 {
        // Direct TCP listener mode
        l, _ := net.Listen("tcp", c.listen)
        go c.server.Serve(l)
        return nil
    }

    // Outbound listener mode (through Xray routing)
    listener := &OutboundListener{
        buffer: make(chan net.Conn, 4),
        done:   done.New(),
    }
    go c.server.Serve(listener)
    c.ohm.RemoveHandler(context.Background(), c.tag)
    return c.ohm.AddHandler(context.Background(), &Outbound{
        tag:      c.tag,
        listener: listener,
    })
}

الوضع الأول -- TCP المباشر (حقل listen مُعيَّن): يفتح مقبس TCP حقيقي. أبسط لكنه مكشوف على الشبكة.

الوضع الثاني -- المعالج الصادر (الافتراضي): ينشئ OutboundListener افتراضيًا ويسجّل معالج Outbound. يتصل عملاء gRPC عبر نظام الوارد/التوجيه في Xray.

OutboundListener

الملف: app/commander/outbound.go

مُستمع net.Listener مدعوم بقناة اتصالات:

go
type OutboundListener struct {
    buffer chan net.Conn  // capacity: 4
    done   *done.Instance
}

func (l *OutboundListener) Accept() (net.Conn, error) {
    select {
    case <-l.done.Wait():
        return nil, errors.New("listen closed")
    case c := <-l.buffer:
        return c, nil
    }
}

المعالج الصادر

يحوّل روابط النقل في Xray إلى net.Conn لخادم gRPC:

go
type Outbound struct {
    tag      string
    listener *OutboundListener
    access   sync.RWMutex
    closed   bool
}

func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
    closeSignal := done.New()
    c := cnc.NewConnection(
        cnc.ConnectionInputMulti(link.Writer),
        cnc.ConnectionOutputMulti(link.Reader),
        cnc.ConnectionOnClose(closeSignal),
    )
    co.listener.add(c)
    <-closeSignal.Wait()  // Block until connection closes
}

الخدمات المتاحة

StatsService

الملف: app/stats/command/command.go

go
type statsServer struct {
    stats     feature_stats.Manager
    startTime time.Time
}

دوال gRPC:

الدالةالوصف
GetStats(name, reset)الحصول على قيمة عدّاد واحد، مع إمكانية إعادة تعيينه
QueryStats(pattern, reset)الاستعلام عن جميع العدّادات المطابقة لنمط سلسلة فرعية
GetSysStats()إحصائيات النظام: وقت التشغيل، الـ goroutines، تخصيص الذاكرة، جامع القمامة
GetStatsOnline(name)الحصول على عدد المستخدمين المتصلين لخريطة مستخدم متصل
GetStatsOnlineIpList(name)الحصول على قائمة عناوين IP مع الطوابع الزمنية لخريطة متصل
GetAllOnlineUsers()عرض جميع المستخدمين الذين لديهم عناوين IP نشطة

توفر دالة GetSysStats تشخيصات وقت التشغيل:

go
func (s *statsServer) GetSysStats(ctx context.Context, request *SysStatsRequest) (*SysStatsResponse, error) {
    var rtm runtime.MemStats
    runtime.ReadMemStats(&rtm)
    return &SysStatsResponse{
        Uptime:       uint32(time.Since(s.startTime).Seconds()),
        NumGoroutine: uint32(runtime.NumGoroutine()),
        Alloc:        rtm.Alloc,
        TotalAlloc:   rtm.TotalAlloc,
        Sys:          rtm.Sys,
        Mallocs:      rtm.Mallocs,
        Frees:        rtm.Frees,
        LiveObjects:  rtm.Mallocs - rtm.Frees,
        NumGC:        rtm.NumGC,
        PauseTotalNs: rtm.PauseTotalNs,
    }, nil
}

لأغراض التوافق مع v2ray، يتم تسجيل StatsService مرتين -- تحت كل من xray.app.stats.command.StatsService و v2ray.core.app.stats.command.StatsService:

go
func (s *service) Register(server *grpc.Server) {
    ss := NewStatsServer(s.statsManager)
    RegisterStatsServiceServer(server, ss)
    vCoreDesc := StatsService_ServiceDesc
    vCoreDesc.ServiceName = "v2ray.core.app.stats.command.StatsService"
    server.RegisterService(&vCoreDesc, ss)
}

HandlerService

الحزمة: app/proxyman/command

يدير المعالجات الواردة والصادرة أثناء التشغيل:

  • إضافة/إزالة معالجات واردة
  • إضافة/إزالة معالجات صادرة
  • تعديل إعدادات المعالجات الواردة

RoutingService

الحزمة: app/router/command

إدارة التوجيه أثناء التشغيل:

  • اختبار قواعد التوجيه ضد سياقات محددة
  • الاستعلام عن جدول التوجيه

LoggerService

الحزمة: app/log/command

  • إعادة تشغيل المسجّل
  • متابعة مخرجات السجل عبر التدفق المستمر

ObservatoryService

الحزمة: app/observatory/command

  • GetOutboundStatus(): يُرجع الحالة الصحية لجميع المعالجات الصادرة المراقَبة

ReflectionService

الملف: app/commander/service.go

يُفعّل انعكاس خادم gRPC للأدوات:

go
type reflectionService struct{}

func (r reflectionService) Register(s *grpc.Server) {
    reflection.Register(s)
}

الإعدادات

الملف: infra/conf/api.go

json
{
    "api": {
        "tag": "api",
        "listen": "127.0.0.1:10085",
        "services": [
            "HandlerService",
            "StatsService",
            "RoutingService",
            "LoggerService",
            "ObservatoryService",
            "ReflectionService"
        ]
    }
}

تقوم دالة Build() بربط أسماء الخدمات بإعدادات protobuf الخاصة بها:

go
func (c *APIConfig) Build() (*commander.Config, error) {
    for _, s := range c.Services {
        switch strings.ToLower(s) {
        case "reflectionservice":
            services = append(services, serial.ToTypedMessage(&commander.ReflectionConfig{}))
        case "handlerservice":
            services = append(services, serial.ToTypedMessage(&handlerservice.Config{}))
        case "statsservice":
            services = append(services, serial.ToTypedMessage(&statsservice.Config{}))
        // ... etc
        }
    }
}

عند استخدام وضع المعالج الصادر (بدون listen)، تحتاج إلى:

  1. وارد dokodemo-door موسوم بوسم API
  2. قاعدة توجيه توجّه ذلك الوارد إلى المعالج الصادر لـ API
json
{
    "inbounds": [{
        "tag": "api-in",
        "protocol": "dokodemo-door",
        "port": 10085,
        "settings": { "address": "127.0.0.1" }
    }],
    "routing": {
        "rules": [{
            "inboundTag": ["api-in"],
            "outboundTag": "api"
        }]
    }
}

ملاحظات التنفيذ

  • يُسجّل المتحكم نفسه عبر common.RegisterConfig((*Config)(nil), ...) ويتطلب outbound.Manager من سجل الميزات.

  • سعة مخزن OutboundListener هي 4. إذا كان خادم gRPC بطيئًا في القبول و4 اتصالات في قائمة الانتظار، يتم إغلاق الاتصالات الإضافية (إسقاطها) فورًا.

  • تقوم دالة Outbound.Dispatch() بالحجب على <-closeSignal.Wait() للحفاظ على رابط النقل حيًا طوال مدة اتصال gRPC.

  • عند إيقاف المتحكم، يتم استدعاء c.server.Stop() الذي ينهي بشكل قسري جميع تدفقات gRPC النشطة. لا يوجد إيقاف تدريجي.

  • إنشاء الخدمات يستخدم مسار common.CreateObject() العام، الذي يبحث عن نوع إعداد protobuf في السجل العام. هذا يعني أنه يمكن توسيع الخدمات بواسطة حزم خارجية تسجّل إعدادات جديدة.

  • حقل listen يُفعّل وضع TCP المباشر، وهو أبسط في الإعداد لكنه يتجاوز نظام التوجيه في Xray. في هذا الوضع، لا يتم تسجيل أي معالج صادر.

تحليل تقني لأغراض إعادة التنفيذ.