Skip to content

Commander:运行时管理 gRPC API

Commander 是 Xray 内置的 gRPC 服务器,提供运行时管理 API。它支持动态配置变更、统计信息查询和系统监控,无需重启进程。

架构

mermaid
flowchart TD
    subgraph Commander
        C[Commander] --> GS[grpc.Server]
        GS --> HS[HandlerService]
        GS --> SS[StatsService]
        GS --> RS[RoutingService]
        GS --> LS[LoggerService]
        GS --> OS[ObservatoryService]
        GS --> RF[ReflectionService]
    end

    subgraph 传输方式
        GS -->|方式 A| OL[OutboundListener]
        OL --> OH[Outbound Handler]
        OH --> RT[Xray 路由]

        GS -->|方式 B| TL[TCP 监听器]
        TL --> NW[直接网络]
    end

    subgraph 客户端
        CLI[gRPC 客户端] --> RT
        CLI2[gRPC 客户端] --> NW
    end

Commander 核心

文件: app/commander/commander.go

go
type Commander struct {
    sync.Mutex
    server   *grpc.Server
    services []Service
    ohm      outbound.Manager
    tag      string
    listen   string
}

Service 接口

go
// app/commander/service.go
type Service interface {
    Register(*grpc.Server)
}

每个 gRPC 服务实现都必须满足此接口以注册其 handler。

初始化

go
func NewCommander(ctx context.Context, config *Config) (*Commander, error) {
    c := &Commander{tag: config.Tag, listen: config.Listen}

    core.RequireFeatures(ctx, func(om outbound.Manager) {
        c.ohm = om
    })

    for _, rawConfig := range config.Service {
        config, _ := rawConfig.GetInstance()             // TypedMessage -> proto.Message
        rawService, _ := common.CreateObject(ctx, config) // proto.Message -> Service
        service, _ := rawService.(Service)
        c.services = append(c.services, service)
    }
}

每个服务通过全局配置注册表(common.CreateObject)从其 Protobuf 配置创建。

启动:两种传输模式

go
func (c *Commander) Start() error {
    c.server = grpc.NewServer()
    for _, service := range c.services {
        service.Register(c.server)
    }

    if len(c.listen) > 0 {
        // 直接 TCP 监听模式
        l, _ := net.Listen("tcp", c.listen)
        go c.server.Serve(l)
        return nil
    }

    // Outbound 监听模式(通过 Xray 路由)
    listener := &OutboundListener{
        buffer: make(chan net.Conn, 4),
        done:   done.New(),
    }
    go c.server.Serve(listener)
    c.ohm.RemoveHandler(context.Background(), c.tag)
    return c.ohm.AddHandler(context.Background(), &Outbound{
        tag:      c.tag,
        listener: listener,
    })
}

模式 1——直接 TCP(设置 listen 字段): 打开一个真实的 TCP 套接字。配置更简单但暴露在网络中。

模式 2——Outbound Handler(默认): 创建虚拟的 OutboundListener 并注册 Outbound handler。gRPC 客户端通过 Xray 的 inbound/路由系统连接。

OutboundListener

文件: app/commander/outbound.go

基于连接 channel 的 net.Listener 实现:

go
type OutboundListener struct {
    buffer chan net.Conn  // 容量:4
    done   *done.Instance
}

func (l *OutboundListener) Accept() (net.Conn, error) {
    select {
    case <-l.done.Wait():
        return nil, errors.New("listen closed")
    case c := <-l.buffer:
        return c, nil
    }
}

Outbound Handler

将 Xray 传输链路转换为 net.Conn 供 gRPC 服务器使用:

go
type Outbound struct {
    tag      string
    listener *OutboundListener
    access   sync.RWMutex
    closed   bool
}

func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
    closeSignal := done.New()
    c := cnc.NewConnection(
        cnc.ConnectionInputMulti(link.Writer),
        cnc.ConnectionOutputMulti(link.Reader),
        cnc.ConnectionOnClose(closeSignal),
    )
    co.listener.add(c)
    <-closeSignal.Wait()  // 阻塞直到连接关闭
}

可用服务

StatsService

文件: app/stats/command/command.go

go
type statsServer struct {
    stats     feature_stats.Manager
    startTime time.Time
}

gRPC 方法:

方法说明
GetStats(name, reset)获取单个计数器的值,可选择性重置
QueryStats(pattern, reset)查询所有匹配子串模式的计数器
GetSysStats()系统统计:运行时间、goroutine 数、内存分配、GC
GetStatsOnline(name)获取用户在线 map 的在线用户数
GetStatsOnlineIpList(name)获取在线 map 的 IP 列表及时间戳
GetAllOnlineUsers()列出所有有活跃 IP 的用户

GetSysStats 方法提供运行时诊断信息:

go
func (s *statsServer) GetSysStats(ctx context.Context, request *SysStatsRequest) (*SysStatsResponse, error) {
    var rtm runtime.MemStats
    runtime.ReadMemStats(&rtm)
    return &SysStatsResponse{
        Uptime:       uint32(time.Since(s.startTime).Seconds()),
        NumGoroutine: uint32(runtime.NumGoroutine()),
        Alloc:        rtm.Alloc,
        TotalAlloc:   rtm.TotalAlloc,
        Sys:          rtm.Sys,
        Mallocs:      rtm.Mallocs,
        Frees:        rtm.Frees,
        LiveObjects:  rtm.Mallocs - rtm.Frees,
        NumGC:        rtm.NumGC,
        PauseTotalNs: rtm.PauseTotalNs,
    }, nil
}

为兼容 v2ray,StatsService 注册了两次——分别使用 xray.app.stats.command.StatsServicev2ray.core.app.stats.command.StatsService

go
func (s *service) Register(server *grpc.Server) {
    ss := NewStatsServer(s.statsManager)
    RegisterStatsServiceServer(server, ss)
    vCoreDesc := StatsService_ServiceDesc
    vCoreDesc.ServiceName = "v2ray.core.app.stats.command.StatsService"
    server.RegisterService(&vCoreDesc, ss)
}

HandlerService

包: app/proxyman/command

运行时管理 inbound 和 outbound handler:

  • 添加/移除 inbound handler
  • 添加/移除 outbound handler
  • 修改 inbound handler 设置

RoutingService

包: app/router/command

运行时路由管理:

  • 根据指定上下文测试路由规则
  • 查询路由表

LoggerService

包: app/log/command

  • 重启日志记录器
  • 通过流式传输跟踪日志输出

ObservatoryService

包: app/observatory/command

  • GetOutboundStatus():返回所有被监测出站的健康状态

ReflectionService

文件: app/commander/service.go

启用 gRPC 服务器反射以支持工具调试:

go
type reflectionService struct{}

func (r reflectionService) Register(s *grpc.Server) {
    reflection.Register(s)
}

配置

文件: infra/conf/api.go

json
{
    "api": {
        "tag": "api",
        "listen": "127.0.0.1:10085",
        "services": [
            "HandlerService",
            "StatsService",
            "RoutingService",
            "LoggerService",
            "ObservatoryService",
            "ReflectionService"
        ]
    }
}

Build() 方法将服务名称映射到对应的 Protobuf 配置:

go
func (c *APIConfig) Build() (*commander.Config, error) {
    for _, s := range c.Services {
        switch strings.ToLower(s) {
        case "reflectionservice":
            services = append(services, serial.ToTypedMessage(&commander.ReflectionConfig{}))
        case "handlerservice":
            services = append(services, serial.ToTypedMessage(&handlerservice.Config{}))
        case "statsservice":
            services = append(services, serial.ToTypedMessage(&statsservice.Config{}))
        // ... 其他服务
        }
    }
}

使用 outbound 模式(不设置 listen)时,需要:

  1. 一个标记为 API tag 的 dokodemo-door inbound
  2. 一条将该 inbound 导向 API outbound 的路由规则
json
{
    "inbounds": [{
        "tag": "api-in",
        "protocol": "dokodemo-door",
        "port": 10085,
        "settings": { "address": "127.0.0.1" }
    }],
    "routing": {
        "rules": [{
            "inboundTag": ["api-in"],
            "outboundTag": "api"
        }]
    }
}

实现要点

  • Commander 通过 common.RegisterConfig((*Config)(nil), ...) 注册自身,并从功能注册表中获取 outbound.Manager

  • OutboundListener 缓冲区容量为 4。如果 gRPC 服务器接受连接较慢且已有 4 个连接排队,额外的连接将被立即关闭(丢弃)。

  • Outbound.Dispatch() 方法通过 <-closeSignal.Wait() 阻塞,以在 gRPC 连接存续期间保持传输链路存活。

  • Commander 关闭时调用 c.server.Stop(),这将强制终止所有活跃的 gRPC 流,没有优雅关闭机制。

  • 服务创建使用通用的 common.CreateObject() 路径,该路径在全局注册表中查找 Protobuf 配置类型。这意味着第三方包可以通过注册新配置来扩展服务。

  • listen 字段启用直接 TCP 模式,配置更简单但绕过了 Xray 的路由系统。在此模式下不会注册 outbound handler。

用于重新实现目的的技术分析。